【问题标题】:PySpark TimestampType() providing wrong conversion: ValueError: year 52129 is out of rangePySpark TimestampType() 提供错误的转换:ValueError: year 52129 is out of range
【发布时间】:2020-05-21 09:20:14
【问题描述】:

我在 PySpark 中通过 Kafka 从 MongoDB 获取具有时间戳值的集合。在 MongoDB 中,架构如下:

"Timestamp": {
        "$date": "2020-02-28T11:24:28.810Z"
    }

在 PySpark 中,我使用以下架构:

StructType([...
         StructField("Timestamp",StructType([StructField("$date",TimestampType(),True)]), True), \
         ...

我正在使用 from_json() 来解析 json 字符串:

data_stream_clean = data_stream_after \
                .select(from_json(col("json_string"), self.schema) \
                .alias("detail")) \
                .select("detail.*") \
                .withColumn("Timestamp", col("Timestamp").getField("$date"))

然后我正在创建一个 tempView 来访问列,它显示:

+---+--------------------+
| Id|           Timestamp|
+---+--------------------+
|231|52129-10-04 10:00...

这是 2020-02-28T11:24:28.810Z 的错误转换。我无法将其转换为显示以下错误的 df:

ValueError: year 52129 is out of range

我还使用了 unix_timestamp(),它显示了正确的转换,即 1582889068810,但采用 int 数据类型。但是我想让我的数据在时间戳中。

【问题讨论】:

  • 为什么您的数据框架构将 Timestamp 定义为时间戳数组?
  • 架构也包含其他字段。

标签: mongodb apache-spark pyspark apache-spark-sql


【解决方案1】:

我尝试将您的示例输入读取为 josn,它在 scala 中运行良好。 你能告诉你如何加载数据框或场景吗?

代码

  val spark = sqlContext.sparkSession
    val implicits = spark.implicits
    import implicits._
    import org.apache.spark.sql.catalyst.ScalaReflection

    val data =
      """
        |{
        |   "Timestamp": {
        |       "$date": "2020-02-28T11:24:28.810Z"
        |   }
        |}
      """.stripMargin
    val schema = StructType(Array(StructField("Timestamp",StructType(Array(StructField("$date", DataTypes.TimestampType))))))


    val ds = spark.read
      .schema(schema)
      .json(Seq(data).toDS())

    ds.show(false)
    ds.printSchema()

结果-

+------------------------+
|Timestamp               |
+------------------------+
|[2020-02-28 16:54:28.81]|
+------------------------+

root
 |-- Timestamp: struct (nullable = true)
 |    |-- $date: timestamp (nullable = true)

即使我尝试将其读取为string,然后将该列转换为Timestamp,但结果是一样的-

 val data =
      """
        |{
        |   "Timestamp": {
        |       "$date": "2020-02-28T11:24:28.810Z"
        |   }
        |}
      """.stripMargin
    val schema = StructType(Array(StructField("Timestamp",StructType(Array(StructField("$date", DataTypes.StringType))))))


    val ds = spark.read
      .schema(schema)
      .json(Seq(data).toDS())

    ds.show(false)
    ds.printSchema()

    ds.select(col("Timestamp.$date").cast(DataTypes.TimestampType)).show(false)

结果:

+--------------------------+
|Timestamp                 |
+--------------------------+
|[2020-02-28T11:24:28.810Z]|
+--------------------------+

root
 |-- Timestamp: struct (nullable = true)
 |    |-- $date: string (nullable = true)

+----------------------+
|$date                 |
+----------------------+
|2020-02-28 16:54:28.81|
+----------------------+

如果您使用DataFrameReader 加载数据,那么您可以使用以下选项更改格式 -

.option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")

来自_json

从 from_json 加载数据对我来说效果很好

 val df = Seq(data).toDF("json_string")
      .select(from_json(col("json_string"), schema).alias("detail"))
      .select("detail.*")
    .withColumn("Timestamp", col("Timestamp").getField("$date"))
df.show(false)
    df.printSchema()

+------------------------+
|Timestamp               |
+------------------------+
|2020-02-28T11:24:28.810Z|
+------------------------+

root
 |-- Timestamp: string (nullable = true)

【讨论】:

  • 嗨,我正在使用 from_json 和方案来解析 json。我已经更新了问题。
  • 甚至 from_json 也运行良好。请看我编辑的回复
  • 我不确定发生了什么。我用unix_timestamp实现了毫秒单位的时间戳,然后除以1000实现了我的应用。
猜你喜欢
  • 2021-03-30
  • 2019-04-10
  • 1970-01-01
  • 2021-05-07
  • 1970-01-01
  • 1970-01-01
  • 2019-04-28
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多