【问题标题】:How to convert a string column with milliseconds to a timestamp with milliseconds in Spark 2.1 using Scala?如何使用 Scala 在 Spark 2.1 中将毫秒的字符串列转换为毫秒的时间戳?
【发布时间】:2017-12-06 19:02:25
【问题描述】:

我正在使用带有 Scala 的 Spark 2.1。

如何将以毫秒为单位的字符串列转换为以毫秒为单位的时间戳?

我从问题Better way to convert a string field into timestamp in Spark中尝试了以下代码

import org.apache.spark.sql.functions.unix_timestamp
val tdf = Seq((1L, "05/26/2016 01:01:01.601"), (2L, "#$@#@#")).toDF("id", "dts")
val tts = unix_timestamp($"dts", "MM/dd/yyyy HH:mm:ss.SSS").cast("timestamp")
tdf.withColumn("ts", tts).show(2, false)

但我得到的结果没有毫秒:

+---+-----------------------+---------------------+
|id |dts                    |ts                   |
+---+-----------------------+---------------------+
|1  |05/26/2016 01:01:01.601|2016-05-26 01:01:01.0|
|2  |#$@#@#                 |null                 |
+---+-----------------------+---------------------+

【问题讨论】:

标签: scala datetime apache-spark


【解决方案1】:

带有 SimpleDateFormat 的 UDF 有效。这个想法来自 Ram Ghadiyaram 与 UDF logic 的链接。

import java.text.SimpleDateFormat
import java.sql.Timestamp
import org.apache.spark.sql.functions.udf
import scala.util.{Try, Success, Failure}

val getTimestamp: (String => Option[Timestamp]) = s => s match {
  case "" => None
  case _ => {
    val format = new SimpleDateFormat("MM/dd/yyyy' 'HH:mm:ss.SSS")
    Try(new Timestamp(format.parse(s).getTime)) match {
      case Success(t) => Some(t)
      case Failure(_) => None
    }    
  }
}

val getTimestampUDF = udf(getTimestamp)
val tdf = Seq((1L, "05/26/2016 01:01:01.601"), (2L, "#$@#@#")).toDF("id", "dts")
val tts = getTimestampUDF($"dts")
tdf.withColumn("ts", tts).show(2, false)

带输出:

+---+-----------------------+-----------------------+
|id |dts                    |ts                     |
+---+-----------------------+-----------------------+
|1  |05/26/2016 01:01:01.601|2016-05-26 01:01:01.601|
|2  |#$@#@#                 |null                   |
+---+-----------------------+-----------------------+

【讨论】:

  • 太棒了!不幸的是,我没有任何火花测试环境:-)
  • @RamGhadiyaram - 这个 udf 的性能如何?它是否为每次调用 udf 创建一个新的 SimpleDateFormat?
  • @RemisHaroon 首先是我们需要使用像 date_format 这样的函数,它将使用简单的数据格式(因为所有优化都是在 spark scala 代码中完成的)在这种情况下似乎它没有给出正确的输出原始海报。所以他继续做udf。创建简单的日期格式影响较小。如果您觉得它可以在 udf 之外(一个对象而不是多个对象),请随时进行更改和测试。
【解决方案2】:

有一种比制作 UDF 更简单的方法。只需解析毫秒数据并将其添加到 unix 时间戳(以下代码适用于 pyspark,应该非常接近 scala 等效项):

timeFmt = "yyyy/MM/dd HH:mm:ss.SSS"
df = df.withColumn('ux_t', unix_timestamp(df.t, format=timeFmt) + substring(df.t, -3, 3).cast('float')/1000)

结果: '2017/03/05 14:02:41.865' 转换为 1488722561.865

【讨论】:

    【解决方案3】:
    import org.apache.spark.sql.functions;
    import org.apache.spark.sql.types.DataTypes;
    
    
    dataFrame.withColumn(
        "time_stamp", 
        dataFrame.col("milliseconds_in_string")
            .cast(DataTypes.LongType)
            .cast(DataTypes.TimestampType)
    )
    

    代码是java,很容易转换成scala

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2016-08-01
      • 1970-01-01
      • 2016-03-04
      • 2013-03-09
      • 1970-01-01
      相关资源
      最近更新 更多