【问题标题】:Generic T as Spark Dataset[T] constructor通用 T 作为 Spark Dataset[T] 构造函数
【发布时间】:2018-03-03 01:40:17
【问题描述】:

在下面的 sn-p 中,tryParquet 函数尝试从 Parquet 文件中加载数据集(如果存在)。如果不是,它会计算、保存并返回提供的数据集计划:

import scala.util.{Try, Success, Failure}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Dataset

sealed trait CustomRow

case class MyRow(
  id: Int,
  name: String
) extends CustomRow

val ds: Dataset[MyRow] =
  Seq((1, "foo"),
      (2, "bar"),
      (3, "baz")).toDF("id", "name").as[MyRow]

def tryParquet[T <: CustomRow](session: SparkSession, path: String, target: Dataset[T]): Dataset[T] =
    Try(session.read.parquet(path)) match {
      case Success(df) => df.as[T] // <---- compile error here
      case Failure(_)  => {
        target.write.parquet(path)
        target
      }
    }

val readyDS: Dataset[MyRow] =
    tryParquet(spark, "/path/to/file.parq", ds)

但是这会在df.as[T] 上产生编译错误:

找不到存储在数据集中的类型的编码器。导入 spark.implicits 支持原始类型(Int、String 等)和产品类型(案例类)。_

将在未来的版本中添加对序列化其他类型的支持。

案例成功(df) => df.as[T]

可以通过使tryParquet 强制转换df 返回一个无类型的DataFrame 并让调用者强制转换为所需的构造函数来规避这个问题。但是,如果我们希望类型由函数内部管理,有什么解决方案吗?

【问题讨论】:

    标签: scala apache-spark apache-spark-dataset apache-spark-encoders


    【解决方案1】:

    看起来可以在类型参数中使用Encoder

    import org.apache.spark.sql.Encoder
    
    def tryParquet[T <: CustomRow: Encoder](...)
    

    这样编译器可以证明df.as[T]在构造对象时提供了一个Encoder。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2018-01-25
      • 2016-03-19
      • 2011-05-17
      • 1970-01-01
      • 2012-05-14
      • 1970-01-01
      • 1970-01-01
      • 2020-05-18
      相关资源
      最近更新 更多