【问题标题】:How to subtract a column of days from a column of dates in Pyspark?如何从 Pyspark 中的一列日期中减去一列天数?
【发布时间】:2016-07-03 06:07:13
【问题描述】:

给定以下 PySpark 数据帧

df = sqlContext.createDataFrame([('2015-01-15', 10),
                                 ('2015-02-15', 5)],
                                 ('date_col', 'days_col'))

如何从日期列中减去天数列?在此示例中,结果列应为 ['2015-01-05', '2015-02-10']

我查看了pyspark.sql.functions.date_sub(),但它需要一个日期列和一天,即date_sub(df['date_col'], 10)。理想情况下,我更喜欢date_sub(df['date_col'], df['days_col'])

我也尝试过创建 UDF:

from datetime import timedelta
def subtract_date(start_date, days_to_subtract):
    return start_date - timedelta(days_to_subtract)

subtract_date_udf = udf(subtract_date, DateType())
df.withColumn('subtracted_dates', subtract_date_udf(df['date_col'], df['days_col'])

这在技术上是可行的,但我了解到,在 Spark 和 Python 之间切换可能会导致大型数据集出现性能问题。我现在可以坚持使用这个解决方案(无需过早优化),但我的直觉认为,必须有一种方法可以在不使用 Python UDF 的情况下完成这个简单的事情。

【问题讨论】:

    标签: python apache-spark pyspark apache-spark-sql user-defined-functions


    【解决方案1】:

    使用 expr 函数(如果您有 dynamic values 从列中减去):

    >>> from pyspark.sql.functions import *
    >>> df.withColumn('substracted_dates',expr("date_sub(date_col,days_col)"))
    

    使用withColumn函数(如果你有literal values进行减法):

    >>> df.withColumn('substracted_dates',date_sub('date_col',<int_literal_value>))
    

    【讨论】:

    • 嗨,舒,我认为这不起作用,因为date_sub 将一列和一个整数值作为参数。它仅适用于expr,例如:df=df.withColumn('substracted_dates', F.expr('date_sub(date_col, day_col)')
    【解决方案2】:

    我可以使用selectExpr 解决这个问题。

    df.selectExpr('date_sub(date_col, day_col) as subtracted_dates')
    

    如果要将列附加到原始 DF,只需将 * 添加到表达式中

    df.selectExpr('*', 'date_sub(date_col, day_col) as subtracted_dates')
    

    【讨论】:

    • 如果您不介意输入 SQL,您实际上可以将其简化为 df.select(expr("date_sub({0}, {1})".format("date_col", "days_col"))),这样编写起来就很简单了。
    【解决方案3】:

    这不是有史以来最优雅的解决方案,但如果您不想破解 Scala 中的 SQL 表达式(并不是说这应该很难,但这些是 sql 私有的),这样的事情应该可以解决问题:

    from pyspark.sql import Column
    
    def date_sub_(c1: Column, c2: Column) -> Column:
        return ((c1.cast("timestamp").cast("long") - 60 * 60 * 24 * c2)
            .cast("timestamp").cast("date"))
    

    对于 Python 2.x,只需删除类型注释。

    【讨论】:

    • 聪明。我想我使用selectExpr 找到了一个更优雅的解决方案,但感谢您的帮助!
    【解决方案4】:

    格式略有不同,但也可以:

    df.registerTempTable("dfTbl")
    
    newdf = spark.sql("""
                         SELECT *, date_sub(d.date_col, d.day_col) AS DateSub 
                         FROM dfTbl d
                       """)
    

    【讨论】:

      猜你喜欢
      • 2010-10-01
      • 2020-12-30
      • 1970-01-01
      • 1970-01-01
      • 2021-03-28
      • 2020-03-17
      • 1970-01-01
      • 2017-07-30
      • 1970-01-01
      相关资源
      最近更新 更多