【问题标题】:Passing case class into function arguments将案例类传递给函数参数
【发布时间】:2018-12-03 10:15:16
【问题描述】:

很抱歉问了一个简单的问题。我想将案例类传递给函数参数,并且我想在函数内部进一步使用它。到目前为止,我已经尝试使用 TypeTagClassTag 进行此操作,但由于某种原因,我无法正确使用它,或者我可能没有看正确的地方。

用例类似于:

case class infoData(colA:Int,colB:String)
case class someOtherData(col1:String,col2:String,col3:Int)

def readCsv[T:???](path:String,passedCaseClass:???): Dataset[???] = {
  sqlContext
    .read
    .option("header", "true")
    .csv(path)
    .as[passedCaseClass]
}

它会被这样称呼:

val infoDf = readCsv("/src/main/info.csv",infoData)
val otherDf = readCsv("/src/main/someOtherData.csv",someOtherData)

【问题讨论】:

  • as 的签名是什么?是否需要一个隐式的“阅读器”来进行转换(例如 PlayJSON)?然后你可以做类似def readCSV[T](path: String)(implicit reader: Reader[T]): Dataset[T]
  • 这里(在 Spark 中)的“阅读器”看起来被称为 Encoder[T]。因此,该类型的隐式应该可以工作。
  • 嗨,据我了解,ascase class 名称作为参数将dataframe 转换为dataset 中的dataset。我像这样使用它。我还在学习sparkscala。你说的那种类型的隐含是什么意思?你可以解释吗?如果我有val infoEncoder = someEncoder 之类的东西,那么你的意思是我应该在readCsv 方法中传递infoEncoder 吗?

标签: scala apache-spark apache-spark-dataset case-class classtag


【解决方案1】:

有两点需要注意,

  1. 类名应该在CamelCase,所以InfoData
  2. 将类型绑定到DataSet 后,它就不是DataFrameDataFrame 是通用RowDataSet 的特殊名称。

您需要确保您提供的类在当前范围内具有对应Encoder 的隐式实例。

case class InfoData(colA: Int, colB: String)

Encoder 原始类型(IntString 等)和case classes 的实例可以通过导入spark.implicits._ 获得

def readCsv[T](path: String)(implicit encoder: Encoder: T): Dataset[T] = {
  spark
    .read
    .option("header", "true")
    .csv(path)
    .as[T]
}

或者,您可以使用上下文绑定,

def readCsv[T: Encoder[T]](path: String): Dataset[T] = {
  spark
    .read
    .option("header", "true")
    .csv(path)
    .as[T]
}

现在,你可以如下使用它,

val spark = ...

import spark.implicits._

def readCsv[T: Encoder[T]](path: String): Dataset[T] = {
  spark
    .read
    .option("header", "true")
    .csv(path)
    .as[T]
}

val infoDS = readCsv[InfoData]("/src/main/info.csv")

【讨论】:

    【解决方案2】:

    首先将您的函数定义更改为:

    object t0 {
        def readCsv[T] (path: String)(implicit spark: SparkSession, encoder: Encoder[T]): Dataset[T] = {
          spark
            .read
            .option("header", "true")
            .csv(path)
            .as[T]
        }
    }
    

    您不需要执行任何类型的反射来创建通用的 readCsv 函数。这里的关键是 Spark 在编译时需要编码器。所以你可以将它作为隐式参数传递,编译器会添加它。

    因为 Spark SQL 可以反序列化产品类型(您的案例类),包括默认编码器,所以很容易像这样调用您的函数:

    case class infoData(colA: Int, colB: String)
    case class someOtherData(col1: String, col2: String, col3: Int)
    
    object test {
      import t0._
    
      implicit val spark = SparkSession.builder().getOrCreate()
    
      import spark.implicits._
      readCsv[infoData]("/tmp")
    
    }
    

    希望对你有帮助

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2013-12-23
      • 2010-12-12
      • 1970-01-01
      • 2013-01-27
      • 1970-01-01
      • 2019-09-29
      • 1970-01-01
      相关资源
      最近更新 更多