【问题标题】:Convert data frame to strong typed data set?将数据框转换为强类型数据集?
【发布时间】:2017-10-24 01:25:04
【问题描述】:

我有以下类,run 从数据库表中返回一个整数列表。

class ItemList(sqlContext: org.apache.spark.sql.SQLContext, jdbcSqlConn: String) {
  def run(date: LocalDate) = {
    sqlContext.read.format("jdbc").options(Map(
      "driver" -> "com.microsoft.sqlserver.jdbc.SQLServerDriver",
      "url" -> jdbcSqlConn,
      "dbtable" -> s"dbo.GetList('$date')"
    )).load()
  }
}

以下代码

val conf = new SparkConf()
val sc = new SparkContext(conf.setAppName("Test").setMaster("local[*]"))
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val itemListJob = new ItemList(sqlContext, jdbcSqlConn)
val processed = itemListJob.run(rc, priority).select("id").map(d => {
  runJob.run(d) // d expected to be int
})
processed.saveAsTextFile("c:\\temp\\mpa")

得到错误

[错误] ...\src\main\scala\main.scala:39:类型不匹配; [错误] 发现:org.apache.spark.sql.Row [错误] 必需:整数 [错误] runJob.run(d) [错误] ^ [错误] 发现一个错误 [错误] (compile:compileIncremental) 编译失败

我试过了

  1. val processed = itemListJob.run(rc, priority).select("id").as[Int].map(d =>
  2. case class itemListRow(id: Int); ....as[itemListRow].

他们都有错误

找不到存储在数据集中的类型的编码器。通过导入 spark.implicits 支持原始类型(Int、String 等)和产品类型(案例类)。未来版本中将添加对序列化其他类型的支持。

更新: 我正在尝试添加导入隐式语句

  1. import sc.implicits._ 得到错误

    值隐式不是 org.apache.spark.SparkContext 的成员

  2. import sqlContext.implicits._ 没问题。但是processed.saveAsTextFile("c:\\temp\\mpa")后面的语句得到了

    的错误

    值 saveAsTextFile 不是 org.apache.spark.sql.Dataset[(Int, java.time.LocalDate)] 的成员

【问题讨论】:

  • 使用as[Int]时出现什么错误?
  • 错误是Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
  • 你试过import spark.implicits._吗?
  • import spark.implicits._得到not found: value spark的错误
  • 您需要创建一个 Spark 会话,如下所述:spark.apache.org/docs/latest/…

标签: scala apache-spark


【解决方案1】:

您只需将带有select("id") 的行更改为如下:

select("id").as[Int]

您应该导入用于将 Rows 转换为 Ints 的隐式。

import sqlContext.implicits._ // <-- import implicits that add the "magic"

您还可以更改 run 以包含转换如下(注意我添加的行中的 cmets):

class ItemList(sqlContext: org.apache.spark.sql.SQLContext, jdbcSqlConn: String) {
  def run(date: LocalDate) = {
    import sqlContext.implicits._ // <-- import implicits that add the "magic"
    sqlContext.read.format("jdbc").options(Map(
      "driver" -> "com.microsoft.sqlserver.jdbc.SQLServerDriver",
      "url" -> jdbcSqlConn,
      "dbtable" -> s"dbo.GetList('$date')"
    )).load()
    .select("id") // <-- take only "id" (which Spark pushes down and hence makes your query faster
    .as[Int] // <-- convert Row into Int
  }
}

值 saveAsTextFile 不是 org.apache.spark.sql.Dataset[(Int, java.time.LocalDate)] 的成员

编译错误是因为您尝试对Dataset 使用不可用的saveAsTextFile 操作。

通过DataFrameWriter 编写Spark SQL,可以使用write 操作符:

write: DataFrameWriter[T] 用于将非流数据集的内容保存到外部存储的接口。

所以你应该做到以下几点:

processed.write.text("c:\\temp\\mpa")

完成!

【讨论】:

  • 我更改了ItemList.run() 以按照您的建议返回数据集。但是,我需要在调用val processed = itemListJob.run(...).map(....) 之前添加import sqlContext.implicits._
猜你喜欢
  • 2010-11-15
  • 1970-01-01
  • 2016-03-18
  • 2023-04-01
  • 1970-01-01
  • 1970-01-01
  • 2022-06-15
  • 2011-06-22
  • 1970-01-01
相关资源
最近更新 更多