【问题标题】:How to load CSVs with timestamps in custom format?如何以自定义格式加载带有时间戳的 CSV?
【发布时间】:2017-04-06 15:28:09
【问题描述】:

我在 csv 文件中有一个时间戳字段,我使用 spark csv 库将其加载到数据帧中。同一段代码在我的本地计算机上运行 Spark 2.0 版本,但在 Azure Hortonworks HDP 3.5 和 3.6 上引发错误。

我检查过,Azure HDInsight 3.5 也使用相同的 Spark 版本,所以我认为这不是 Spark 版本的问题。

import org.apache.spark.sql.types._
val sourceFile = "C:\\2017\\datetest"
val sourceSchemaStruct = new StructType()
  .add("EventDate",DataTypes.TimestampType)
  .add("Name",DataTypes.StringType)
val df = spark.read
  .format("com.databricks.spark.csv")
  .option("header","true")
  .option("delimiter","|")
  .option("mode","FAILFAST")
  .option("inferSchema","false")
  .option("dateFormat","yyyy/MM/dd HH:mm:ss.SSS")
  .schema(sourceSchemaStruct)
  .load(sourceFile)

整个异常如下:

Caused by: java.lang.IllegalArgumentException: Timestamp format must be yyyy-mm-dd hh:mm:ss[.fffffffff]
  at java.sql.Timestamp.valueOf(Timestamp.java:237)
  at org.apache.spark.sql.catalyst.util.DateTimeUtils$.stringToTime(DateTimeUtils.scala:179)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$13$$anonfun$apply$2.apply$mcJ$sp(UnivocityParser.scala:142)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$13$$anonfun$apply$2.apply(UnivocityParser.scala:142)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$13$$anonfun$apply$2.apply(UnivocityParser.scala:142)
  at scala.util.Try.getOrElse(Try.scala:79)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$13.apply(UnivocityParser.scala:139)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$13.apply(UnivocityParser.scala:135)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser.org$apache$spark$sql$execution$datasources$csv$UnivocityParser$$nullSafeDatum(UnivocityParser.scala:179)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9.apply(UnivocityParser.scala:135)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9.apply(UnivocityParser.scala:134)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser.org$apache$spark$sql$execution$datasources$csv$UnivocityParser$$convert(UnivocityParser.scala:215)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser.parse(UnivocityParser.scala:187)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$5.apply(UnivocityParser.scala:304)
  at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$5.apply(UnivocityParser.scala:304)
  at org.apache.spark.sql.execution.datasources.FailureSafeParser.parse(FailureSafeParser.scala:61)
  ... 27 more

csv文件只有一行如下:

"EventDate"|"Name"
"2016/12/19 00:43:27.583"|"adam"

【问题讨论】:

    标签: apache-spark apache-spark-sql hortonworks-data-platform azure-hdinsight


    【解决方案1】:

    TL;DR 使用timestampFormat 选项(不是dateFormat)。


    我已经成功地在最新的 Spark 版本 2.3.0-SNAPSHOT 中重现了它(由 master 构建)。

    // OS shell
    $ cat so-43259485.csv
    "EventDate"|"Name"
    "2016/12/19 00:43:27.583"|"adam"
    
    // spark-shell
    scala> spark.version
    res1: String = 2.3.0-SNAPSHOT
    
    case class Event(EventDate: java.sql.Timestamp, Name: String)
    import org.apache.spark.sql.Encoders
    val schema = Encoders.product[Event].schema
    
    scala> spark
      .read
      .format("csv")
      .option("header", true)
      .option("mode","FAILFAST")
      .option("delimiter","|")
      .schema(schema)
      .load("so-43259485.csv")
      .show(false)
    17/04/08 11:03:42 ERROR Executor: Exception in task 0.0 in stage 7.0 (TID 7)
    java.lang.IllegalArgumentException: Timestamp format must be yyyy-mm-dd hh:mm:ss[.fffffffff]
        at java.sql.Timestamp.valueOf(Timestamp.java:237)
        at org.apache.spark.sql.catalyst.util.DateTimeUtils$.stringToTime(DateTimeUtils.scala:167)
        at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$17$$anonfun$apply$6.apply$mcJ$sp(UnivocityParser.scala:146)
        at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$17$$anonfun$apply$6.apply(UnivocityParser.scala:146)
        at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$17$$anonfun$apply$6.apply(UnivocityParser.scala:146)
        at scala.util.Try.getOrElse(Try.scala:79)
    

    corresponding line in the Spark sources 是问题的“根本原因”:

    Timestamp.valueOf(s)
    

    阅读了javadoc of Timestamp.valueOf,您可以了解到参数应该是:

    yyyy-[m]m-[d]d hh:mm:ss[.f...] 格式的时间戳。小数秒可以省略。 mm 和 dd 的前导零也可以省略。

    注意“小数秒可能会被省略”,所以让我们先将 EventDate 作为字符串加载,然后仅在删除不需要的小数秒后将其转换为时间戳来切断它。

    val eventsAsString = spark.read.format("csv")
      .option("header", true)
      .option("mode","FAILFAST")
      .option("delimiter","|")
      .load("so-43259485.csv")
    

    事实证明,for fields of TimestampType type Spark uses timestampFormat option 首先如果定义且仅在未定义时使用 the code the uses Timestamp.valueOf

    事实证明,解决方法只是使用timestampFormat 选项(而不是dateFormat!)。

    val df = spark.read
      .format("com.databricks.spark.csv")
      .option("header","true")
      .option("delimiter","|")
      .option("mode","FAILFAST")
      .option("inferSchema","false")
      .option("timestampFormat","yyyy/MM/dd HH:mm:ss.SSS")
      .schema(sourceSchemaStruct)
      .load(sourceFile)
    scala> df.show(false)
    +-----------------------+----+
    |EventDate              |Name|
    +-----------------------+----+
    |2016-12-19 00:43:27.583|adam|
    +-----------------------+----+
    

    火花 2.1.0

    在 CSV 中使用 inferSchema 选项和您的自定义 timestampFormat 进行架构推断。

    使用inferSchema 触发架构推断以使timestampFormat 生效非常重要。

    val events = spark.read
      .format("csv")
      .option("header", true)
      .option("mode","FAILFAST")
      .option("delimiter","|")
      .option("inferSchema", true)
      .option("timestampFormat", "yyyy/MM/dd HH:mm:ss")
      .load("so-43259485.csv")
    
    scala> events.show(false)
    +-------------------+----+
    |EventDate          |Name|
    +-------------------+----+
    |2016-12-19 00:43:27|adam|
    +-------------------+----+
    
    scala> events.printSchema
    root
     |-- EventDate: timestamp (nullable = true)
     |-- Name: string (nullable = true)
    

    “不正确”初始版本留作学习之用

    val events = eventsAsString
      .withColumn("date", split($"EventDate", " ")(0))
      .withColumn("date", translate($"date", "/", "-"))
      .withColumn("time", split($"EventDate", " ")(1))
      .withColumn("time", split($"time", "[.]")(0))    // <-- remove millis part
      .withColumn("EventDate", concat($"date", lit(" "), $"time")) // <-- make EventDate right
      .select($"EventDate" cast "timestamp", $"Name")
    
    scala> events.printSchema
    root
     |-- EventDate: timestamp (nullable = true)
     |-- Name: string (nullable = true)
        events.show(false)
    
    scala> events.show
    +-------------------+----+
    |          EventDate|Name|
    +-------------------+----+
    |2016-12-19 00:43:27|adam|
    +-------------------+----+
    

    火花 2.2.0

    从 Spark 2.2 开始,您可以使用 to_timestamp 函数进行字符串到时间戳的转换。

    eventsAsString.select($"EventDate", to_timestamp($"EventDate", "yyyy/MM/dd HH:mm:ss.SSS")).show(false)
    
    scala> eventsAsString.select($"EventDate", to_timestamp($"EventDate", "yyyy/MM/dd HH:mm:ss.SSS")).show(false)
    +-----------------------+----------------------------------------------------+
    |EventDate              |to_timestamp(`EventDate`, 'yyyy/MM/dd HH:mm:ss.SSS')|
    +-----------------------+----------------------------------------------------+
    |2016/12/19 00:43:27.583|2016-12-19 00:43:27                                 |
    +-----------------------+----------------------------------------------------+
    

    【讨论】:

    • 如果我们想在同一个文件中解析多个时间戳格式怎么办,例如,在我的 csv 文件中,我有一些时间戳,如“dd/MM/yyyy”和“dd-mm-yyyy”我希望能够将两者都解析为时间戳,我尝试了这个 .Option("TimeStampFormat", "dd/MM/yyyy, dd-MM-yyyy") 但它不起作用
    • @Pugnatore 不适用于不一致的时间戳格式。您必须创建一个 UDF 才能自己进行解析。
    【解决方案2】:

    我搜索了这个问题,发现了官方 Github 问题页面https://github.com/databricks/spark-csv/pull/280,它修复了一个相关的错误,用于解析具有自定义日期格式的数据。我查看了一些源代码,并根据code找出您的问题原因,设置为inferSchema,默认值为false,如下所示。

    inferSchema:自动推断列类型。它需要对数据进行一次额外的传递,并且默认为 false

    请将inferSchema 更改为true 为您的日期格式yyyy/MM/dd HH:mm:ss.SSS 使用SimpleDateFormat

    【讨论】:

    • 我修改了我的代码如下并运行它。它仍然产生相同的错误 val df = spark.read.format("com.databricks.spark.csv").option("header","true").option("delimiter","|").option ("mode","FAILFAST").option("inferSchema","true").option("dateFormat","yyyy/MM/dd HH:mm:ss.SSS").schema(sourceSchemaStruct).load(源文件)
    猜你喜欢
    • 1970-01-01
    • 2013-04-06
    • 2014-10-20
    • 1970-01-01
    • 2017-05-28
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-04-28
    相关资源
    最近更新 更多