【问题标题】:Scala / Spark: how to define serializable case classes (non-REPL)?Scala / Spark:如何定义可序列化的案例类(非REPL)?
【发布时间】:2020-04-03 22:36:48
【问题描述】:

Scala 新手在这里。 我在 Zeppelin 笔记本上写了一份 Spark 作业的草稿。我使用了 Datasets api,所以为了让我的案例类在执行 ds.map(s => MyCaseClass(...)) 时正确序列化,我在一个单独的笔记本单元格中定义了我的案例类。否则就不行了。

现在我正在编写一个实际的作业来在 Apache Airflow 中运行它。 主文件如下所示:

class MainObj {
   private val spark = SparkSession.builder()...getOrCreate()       
   import spark.implicits._

   case class MyCaseClass(...)

   def run() {       
      spark.read
      ...
      .map(s => MyCaseClass(...))
      ...
   }
}

object MainObj {
   def apply(arguments: Arguments, sparkConf: Traversable[(String, String)]) = 
      new MainObj(arguments, sparkConf)

   def main(args: Array[String]): Unit = {
      MainObj(...).run()
   }
}

在这种情况下,我得到:

如果无法访问定义该类的范围,则无法为内部类 MainObj$MyCaseClass 生成编码器。

如果我在run() 内部或之前添加org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this),我会得到:

引起:java.io.NotSerializableException: MainObj 序列化堆栈:对象不可序列化(类:MainObj,值:MainObj@2f11d889)

我还尝试将案例类移动到单独的文件(不起作用)或 run() 内部(甚至无法编译)。

对这个问题感到非常沮丧...谁能帮忙,或者至少给我指出一个解释case classesspark.implicitsscopes之间关系的地方?

【问题讨论】:

    标签: scala apache-spark


    【解决方案1】:

    您需要在根级别定义案例类,而不是在类/对象中。

    你能试试下面的结构吗?

    case class MyCaseClass(...)
    
    class MainObj {
       private val spark = SparkSession.builder()...getOrCreate()       
       import spark.implicits._
    
       def run() {       
          spark.read
          ...
          .map(s => MyCaseClass(...))
          ...
       }
    }
    
    object MainObj {
       def apply(arguments: Arguments, sparkConf: Traversable[(String, String)]) = 
          new MainObj(arguments, sparkConf)
    
       def main(args: Array[String]): Unit = {
          MainObj(...).run()
       }
    }
    

    【讨论】:

    • 你是对的!虽然我实际上尝试过这个。我发现我的问题有一个不同的根本原因,我没有包含在原始问题中,所以我接受你的回答是正确的(对于我提出的问题)。
    【解决方案2】:

    我在写问题时遗漏了一段非常重要的代码。

    其实,我损坏的代码是这样的:

    class MainObj {
       private val spark = SparkSession.builder()...getOrCreate()       
       import spark.implicits._
    
       case class MyCaseClass(...)
    
       // This is what I left out
       def someFunction() { 
          ...
       }
    
       def run() {       
          spark.read
          ...
          .map(s => { ...someFunction() ... }) // and this
          .map(s => MyCaseClass(...))
          ...
       }
    }
    
    object MainObj {
       def apply(arguments: Arguments, sparkConf: Traversable[(String, String)]) = 
          new MainObj(arguments, sparkConf)
    
       def main(args: Array[String]): Unit = {
          MainObj(...).run()
       }
    }
    

    这就是工作代码的样子:

    // FIXED: moved case class to the root scope
    case class MyCaseClass(...)
    
    class MainObj {
       private val spark = SparkSession.builder()...getOrCreate()       
       import spark.implicits._                  
    
       def run() {       
          spark.read
          ...
          .map(s => { ... MainObj.someFunction() ... }) // FIXED
          .map(s => MyCaseClass(...))
          ...
       }
    }
    
    object MainObj {
       // FIXED: moved function to a companion object; 
       // now calling it inside map(...) does not trigger serialization 
       // of MainObj object, it works like a static method call in Java
       def someFunction() {}
    
       def apply(arguments: Arguments, sparkConf: Traversable[(String, String)]) = 
          new MainObj(arguments, sparkConf)
    
       def main(args: Array[String]): Unit = {
          MainObj(...).run()
       }
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2013-09-04
      • 2017-03-29
      • 2015-08-10
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2013-08-12
      • 1970-01-01
      相关资源
      最近更新 更多