【问题标题】:Read csv file in spark of varying columns在不同列的火花中读取 csv 文件
【发布时间】:2019-08-16 00:23:03
【问题描述】:

我想使用 Scala 将 csv 文件读入 spark 中的数据帧。 我的 csv 文件的第一条记录有三列,其余记录有 5 列。我的 csv 文件没有列名。我已经提到这里是为了理解

Ex:
I'dtype  date             recordsCount
0          13-02-2015  300
I'dtype  date          type      location.     locationCode
1         13-02-2015.    R.          USA.            Us
1.        13-02-2015.    T.          London.      Lon

我的问题是我将如何将此文件读入数据框,因为第一行和其余行具​​有不同的列。 我尝试的解决方案是将文件读取为 rdd 并过滤掉标头记录,然后将剩余记录转换为数据帧。 有没有更好的解决方案?请帮帮我

【问题讨论】:

  • 最后两列可以为空吗?
  • 看起来除了有两个不同的列长度之外,您还有完全不同的列,第三列是记录计数或类型。这使事情变得更加复杂,可能应该在您的问题中注明。

标签: scala csv apache-spark


【解决方案1】:

您可以将文件加载为原始文本,然后使用案例类、Either 实例和模式匹配来整理出内容。下面的例子。

case class Col3(c1: Int, c2: String, c3: Int)
case class Col5(c1: Int, c2: String, c5_col3: String, c4:String, c5: String)
case class Header(value: String)

type C3 = Either[Header, Col3]
type C5 = Either[Header, Col5]

// assume sqlC & sc created 

val path = "tmp.tsv"
val rdd = sc.textFile(path)

val eitherRdd: RDD[Either[C3, C5]] = rdd.map{s =>
  val spl = s.split("\t")
  spl.length match{
    case 3 =>
      val res = Try{
        Col3(spl(0).toInt, spl(1), spl(2).toInt)
      }
      res match{
        case Success(c3) => Left(Right(c3))
        case Failure(_) => Left(Left(Header(s)))
      }
    case 5 =>
      val res = Try{
        Col5(spl(0).toInt, spl(1), spl(2), spl(3), spl(4))
      }
      res match{
        case Success(c5) => Right(Right(c5))
        case Failure(_) => Right(Left(Header(s)))
      }
    case _ => throw new Exception("fail")
  }
}

val rdd3 = eitherRdd.flatMap(_.left.toOption)
val rdd3Header = rdd3.flatMap(_.left.toOption).collect().head
val df3 = sqlC.createDataFrame(rdd3.flatMap(_.right.toOption))

val rdd5 = eitherRdd.flatMap(_.right.toOption)
val rdd5Header = rdd5.flatMap(_.left.toOption).collect().head
val df5 = sqlC.createDataFrame(rdd5.flatMap(_.right.toOption))

df3.show()

df5.show()

用下面的简单 tsv 测试:

col1    col2    col3
0   sfd 300
1   asfd    400
col1    col2    col4    col5    col6
2   pljdsfn R   USA Us
3   sad T   London  Lon

给出输出

+---+----+---+
| c1|  c2| c3|
+---+----+---+
|  0| sfd|300|
|  1|asfd|400|
+---+----+---+

+---+-------+-------+------+---+
| c1|     c2|c5_col3|    c4| c5|
+---+-------+-------+------+---+
|  2|pljdsfn|      R|   USA| Us|
|  3|    sad|      T|London|Lon|
+---+-------+-------+------+---+

为了简单起见,我忽略了日期格式,只是将这些字段存储为字符串。但是,添加日期解析器以获取正确的列类型并不会复杂得多。

同样,我依靠解析失败来指示标题行。如果解析不会失败,或者必须做出更复杂的决定,您可以替换不同的逻辑。同样,需要更复杂的逻辑来区分相同长度的不同记录类型,或者可能包含(转义)拆分字符

【讨论】:

  • 非常感谢您提供详细的答案。
【解决方案2】:

这有点小技巧,但这里有一个忽略文件第一行的解决方案。

val cols = Array("dtype", "date", "type", "location", "locationCode")
val schema = new StructType(cols.map(n => StructField(n ,StringType, true)))
spark.read
    .schema(schema) // we specify the schema
    .option("header", true) // and tell spark that there is a header
    .csv("path/file.csv")

第一行是标题,但指定了架构。第一行因此被忽略。

【讨论】:

  • 我不想忽略标题。我需要标题进行进一步处理。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-07-18
  • 2017-05-19
  • 2020-04-05
相关资源
最近更新 更多