【问题标题】:How to parse CSV with a Dataset based on a case class?如何使用基于案例类的数据集解析 CSV?
【发布时间】:2016-08-30 03:22:57
【问题描述】:

我正在尝试使用新的 Spark 1.6.0 API 数据集解析 CSV。无论如何,我在这样做时遇到了一些问题。我想为每个 CSV 行创建一个 case class

这是代码:

case class MyData (forename:String, surname:String, age:Integer)

    def toMyData(text: String): Dataset[MyData] = {
      val splits: Array[String] = text.split("\t")
      Seq(MyData(
        forename = splits(0),
        surname = splits(1),
        age = splits(2).asInstanceOf[Integer]
      )).toDS()
    }

    val lines:Dataset[MyData] = sqlContext.read.text("/data/mydata.csv").as[MyData]
    lines.map(r => toMyData(r)).foreach(println)

我的 toMyData 只是 Encoder 的一种,但我不知道如何按照 API 正确执行此操作。

有什么想法吗?

编辑:

我以这种方式更改了代码,但我什至无法编译:

val lines:Dataset[MyData] = sqlContext.read.text("/data/mydata.csv").as[MyData]
    lines.map(r => toMyData(r)).foreach(println)

def toMyData(text: String): Dataset[MyData] = {
      val df = sc.parallelize(Seq(text)).toDF("value")

      df.map(_.getString(0).split("\t") match {
        case Array(fn, sn, age) =>
          MyData(fn, sn, age.asInstanceOf[Integer])
      }).toDS

    }

    sqlContext.read.text("/data/mydata.csv").as[String].map(r => toMyData(r)).collect().foreach(println)

据我所知:

Error:(50, 10) value toDS is not a member of org.apache.spark.rdd.RDD[MyData]
possible cause: maybe a semicolon is missing before `value toDS'?
      }).toDS
         ^
Error:(54, 133) Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing sqlContext.implicits._  Support for serializing other types will be added in future releases.
    sqlContext.read.text("/data/mydata.csv").as[String].map(r => toMyData(r)).collect().foreach(println)

【问题讨论】:

    标签: scala apache-spark


    【解决方案1】:

    忽略格式验证和异常处理:

    //  Simulate sqlContext.read.text("/data/mydata.csv")
    val df = sc.parallelize(Seq("John\tDoe\t22")).toDF("value")
    
    df.rdd.map(_.getString(0).split("\t") match {
      case Array(fn, sn, age) => MyData(fn, sn, age.toInt)
    }).toDS
    

    或不转换为 RDD:

    import org.apache.spark.sql.functions.regexp_extract
    
    val pattern = "^(.*?)\t(.*?)\t(.*)$"
    val exprs = Seq(
      (1, "forename", "string"), (2, "surname", "string"), (3, "age", "integer")
    ).map{case (i, n, t) => regexp_extract($"value", pattern, i).alias(n).cast(t)}
    
    df
      .select(exprs: _*)  // Convert to (StringType, StringType, IntegerType)
      .as[MyData]  // cast
    

    总结:

    • 不要使用嵌套动作、转换或 DDS。
    • 在使用之前阅读asInstanceOf 的工作原理。此处不适用。

    【讨论】:

    • 我认为.as[String].transform(r => toMyData(r)) 的方法有任何意义。无论如何,我也会尝试您的解决方案。谢谢
    • toMyData 放置在转换中根本无法工作。数据集是分布式结构,不能嵌套。
    • 不,它没有。仔细检查类型。地图函数的类型是 Row => MyData 而不是 Row => DataSet[MyData]
    • @zero323 答案完美运行,您只需将val df =.. 更改为sqlContext.read.text("") 即可获得每行一个字符串的DF?
    猜你喜欢
    • 2012-03-31
    • 2014-07-03
    • 2021-08-11
    • 2010-11-20
    • 2017-08-31
    相关资源
    最近更新 更多