我根据您在 ES 6.4/Spark 2.1 版本中的数据创建了一个示例文档,并使用了以下代码,以便将 GenerateTime 字段读取为 text 而不是火花中的日期类型。
ES 中的映射
PUT somedateindex
{
"mappings": {
"mydocs":{
"properties": {
"GenerateTime": {
"type": "date",
"format": "yyyy/MM/dd HH:mm:ss"
}
}
}
}
}
请注意,ES 中的字段是date 类型。
使用 ES 中的日期字段作为字符串的 Spark 代码
请注意,我使用了配置 option("es.mapping.date.rich", false)
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config("spark.master", "local")
.getOrCreate()
// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._
val df = spark.read.format("org.elasticsearch.spark.sql")
.option("es.resource.read","somedateindex")
.option("es.nodes", "some_host_name")
.option("es.mapping.date.rich", false)
.option("es.port","9200")
.load()
df.show()
df.printSchema()
我的 Eclipse 控制台中的 Spark 代码结果:
19/05/13 03:10:53 INFO DAGScheduler: Job 1 finished: show at Elasticsearch.scala:134, took 9.424294 s
19/05/13 03:10:53 INFO CodeGenerator: Code generated in 21.256205 ms
+-------------------+
| GenerateTime|
+-------------------+
|2019/05/06 19:31:21|
+-------------------+
root
|-- GenerateTime: string (nullable = true)
19/05/13 03:10:53 INFO SparkUI: Stopped Spark web UI at....
注意printSchema 表明该表有一个GenerateTime 类型的string 列。
如果您不想继续更改映射,上述内容应该对您有所帮助。
我建议使用日期格式而不是文本格式的日期字段,并且也使用 ISO-8601 支持的格式,这样当类型推断开始时,您最终会在 Spark 中获得正确类型的数据,您可以简单地专注于业务逻辑,很多时候正确的解决方案在于我们如何存储数据,而不是我们如何处理它。
将字符串转换为时间戳/日期的 Spark 代码
但是,如果由于某种原因您无法从源(即 elasticsearch)更改映射,您可以进一步添加以下代码,使用以下代码将字符串值转换为时间戳:
import org.apache.spark.sql.functions._
//String into Timestamp Transformation
val df2_timestamp = df.withColumn("GenerateTime_timestamp", from_unixtime(unix_timestamp($"GenerateTime", "yyyy/MM/dd HH:mm:ss")).cast(TimestampType))
df2_timestamp.show(false)
df2_timestamp.printSchema();
如果你运行上面的代码,你会看到如下输出:
19/05/14 11:33:10 INFO CodeGenerator: Code generated in 23.742359 ms
+-------------------+----------------------+
|GenerateTime |GenerateTime_timestamp|
+-------------------+----------------------+
|2019/05/06 19:31:21|2019-05-06 19:31:21.0 |
+-------------------+----------------------+
root
|-- GenerateTime: string (nullable = true)
|-- GenerateTime_timestamp: timestamp (nullable = true)
19/05/14 11:33:10 INFO SparkContext: Invoking stop() from shutdown hook
另请注意,我的解决方案是在 Scala 中。如果有帮助,请告诉我!