【问题标题】:PySpark 1.5 How to Truncate Timestamp to Nearest Minute from secondsPySpark 1.5 如何将时间戳从秒截断到最近的分钟
【发布时间】:2016-03-17 20:50:22
【问题描述】:

我正在使用 PySpark。我在数据框('canon_evt')中有一个列('dt'),这是一个时间戳。我正在尝试从 DateTime 值中删除秒。它最初是从 parquet 作为字符串读入的。然后我尝试通过

将其转换为时间戳
canon_evt = canon_evt.withColumn('dt',to_date(canon_evt.dt))
canon_evt= canon_evt.withColumn('dt',canon_evt.dt.astype('Timestamp'))

然后我想删除秒。我尝试了“trunc”、“date_format”,甚至尝试将片段连接在一起,如下所示。我认为它需要某种 map 和 lambda 组合,但我不确定 Timestamp 是否是一种合适的格式,以及是否可以摆脱秒。

canon_evt = canon_evt.withColumn('dyt',year('dt') + '-' + month('dt') +
    '-' + dayofmonth('dt') + ' ' + hour('dt') + ':' + minute('dt'))

[Row(dt=datetime.datetime(2015, 9, 16, 0, 0),dyt=None)]

【问题讨论】:

  • 你能贴出你从镶木地板上读到的样子吗?
  • [Row(dt='2015-09-16 05:39:46')] , Row(dt='2015-09-16 05:40:46')]
  • 'zero323' ,感谢您的超级快速帮助!

标签: python datetime apache-spark apache-spark-sql pyspark


【解决方案1】:

火花 >= 2.3

你可以使用date_trunc

df.withColumn("dt_truncated", date_trunc("minute", col("dt"))).show()

## +-------------------+-------------------+
## |                 dt|       dt_truncated|
## +-------------------+-------------------+
## |1970-01-01 00:00:00|1970-01-01 00:00:00|
## |2015-09-16 05:39:46|2015-09-16 05:39:00|
## |2015-09-16 05:40:46|2015-09-16 05:40:00|
## |2016-03-05 02:00:10|2016-03-05 02:00:00|
## +-------------------+-------------------+

火花

转换为 Unix 时间戳和基本算术应该是诀窍:

from pyspark.sql import Row
from pyspark.sql.functions import col, unix_timestamp, round

df = sc.parallelize([
    Row(dt='1970-01-01 00:00:00'),
    Row(dt='2015-09-16 05:39:46'),
    Row(dt='2015-09-16 05:40:46'),
    Row(dt='2016-03-05 02:00:10'),
]).toDF()


## unix_timestamp converts string to Unix timestamp (bigint / long)
## in seconds. Divide by 60, round, multiply by 60 and cast
## should work just fine.
## 
dt_truncated = ((round(unix_timestamp(col("dt")) / 60) * 60)
    .cast("timestamp"))

df.withColumn("dt_truncated", dt_truncated).show(10, False)
## +-------------------+---------------------+
## |dt                 |dt_truncated         |
## +-------------------+---------------------+
## |1970-01-01 00:00:00|1970-01-01 00:00:00.0|
## |2015-09-16 05:39:46|2015-09-16 05:40:00.0|
## |2015-09-16 05:40:46|2015-09-16 05:41:00.0|
## |2016-03-05 02:00:10|2016-03-05 02:00:00.0|
## +-------------------+---------------------+

【讨论】:

  • 如果我只能访问 Spark 1.3,因此没有 'unix_timestamp' 功能,在 Spark SQL 或 DataFrame 中执行是否仍然容易?
  • 只要使用Hive UDF
【解决方案2】:

这个问题是几年前提出的,但如果其他人遇到过这个问题,从 Spark v2.3 开始,这已作为一项功能添加。现在这很简单(假设 canon_evt 是一个带有时间戳列 dt 的数据帧,我们要从中删除秒数)

from pyspark.sql.functions import date_trunc

canon_evt = canon_evt.withColumn('dt', date_trunc('minute', canon_evt.dt))

【讨论】:

    【解决方案3】:

    我认为 zero323 有最好的答案。考虑到实现起来非常容易,Spark 本身不支持这一点有点烦人。对于后代,这是我使用的一个函数:

    def trunc(date, format):
        """Wraps spark's trunc fuction to support day, minute, and hour"""
        import re
        import pyspark.sql.functions as func
    
        # Ghetto hack to get the column name from Column object or string:
        try:
            colname = re.match(r"Column<.?'(.*)'>", str(date)).groups()[0]
        except AttributeError:
            colname = date
    
        alias = "trunc(%s, %s)" % (colname, format)
    
        if format in ('year', 'YYYY', 'yy', 'month', 'mon', 'mm'):
            return func.trunc(date, format).alias(alias)
        elif format in ('day', 'DD'):
            return func.date_sub(date, 0).alias(alias)
        elif format in ('min', ):
            return ((func.round(func.unix_timestamp(date) / 60) * 60).cast("timestamp")).alias(alias)
        elif format in ('hour', ):
            return ((func.round(func.unix_timestamp(date) / 3600) * 3600).cast("timestamp")).alias(alias)
    

    【讨论】:

    • 谢谢!您的回答正是我想要找到的。
    • 请注意,对于 29 分钟以上的分钟,小时截断将四舍五入到下一个小时。12:15 将是 12:00,12:30 将是 13:00。如果您寻找截断功能,它可能不是您想要的功能。
    【解决方案4】:

    将时间戳截断为其他分钟,例如 5 分钟或 10 分钟或 7 分钟

    from pyspark.sql.functions import *
    df = spark.createDataFrame([("2016-03-11 09:00:07", 1, 5),("2016-03-11 09:00:57", 2, 5)]).toDF("date", "val1","val2")
    w = df.groupBy('val',window("date", "5 seconds")).agg(sum("val1").alias("sum"))
    w.select(w.window.start.cast("string").alias("start"),w.window.end.cast("string").alias("end"), "sum", "val").show(10, False)
    

    【讨论】:

      猜你喜欢
      • 2018-03-24
      • 2016-07-02
      • 2021-11-16
      • 1970-01-01
      • 2022-01-16
      • 2021-07-11
      • 2014-04-07
      • 2011-01-22
      • 1970-01-01
      相关资源
      最近更新 更多