【发布时间】:2020-05-21 09:20:14
【问题描述】:
我在 PySpark 中通过 Kafka 从 MongoDB 获取具有时间戳值的集合。在 MongoDB 中,架构如下:
"Timestamp": {
"$date": "2020-02-28T11:24:28.810Z"
}
在 PySpark 中,我使用以下架构:
StructType([...
StructField("Timestamp",StructType([StructField("$date",TimestampType(),True)]), True), \
...
我正在使用 from_json() 来解析 json 字符串:
data_stream_clean = data_stream_after \
.select(from_json(col("json_string"), self.schema) \
.alias("detail")) \
.select("detail.*") \
.withColumn("Timestamp", col("Timestamp").getField("$date"))
然后我正在创建一个 tempView 来访问列,它显示:
+---+--------------------+
| Id| Timestamp|
+---+--------------------+
|231|52129-10-04 10:00...
这是 2020-02-28T11:24:28.810Z 的错误转换。我无法将其转换为显示以下错误的 df:
ValueError: year 52129 is out of range
我还使用了 unix_timestamp(),它显示了正确的转换,即 1582889068810,但采用 int 数据类型。但是我想让我的数据在时间戳中。
【问题讨论】:
-
为什么您的数据框架构将 Timestamp 定义为时间戳数组?
-
架构也包含其他字段。
标签: mongodb apache-spark pyspark apache-spark-sql