【问题标题】:PySpark add a column to a DataFrame from a TimeStampType columnPySpark 从 TimeStampType 列向 DataFrame 添加一列
【发布时间】:2015-06-17 04:20:17
【问题描述】:

我有一个看起来像这样的 DataFrame。我想在date_time 字段的当天进行操作。

root
 |-- host: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- date_time: timestamp (nullable = true)

我尝试添加一列来提取日期。到目前为止,我的尝试都失败了。

df = df.withColumn("day", df.date_time.getField("day"))

org.apache.spark.sql.AnalysisException: GetField is not valid on fields of type TimestampType;

这也失败了

df = df.withColumn("day", df.select("date_time").map(lambda row: row.date_time.day))

AttributeError: 'PipelinedRDD' object has no attribute 'alias'

知道如何做到这一点吗?

【问题讨论】:

    标签: python apache-spark apache-spark-sql pyspark


    【解决方案1】:

    你可以使用简单的map:

    df.rdd.map(lambda row:
        Row(row.__fields__ + ["day"])(row + (row.date_time.day, ))
    )
    

    另一种选择是注册一个函数并运行 SQL 查询:

    sqlContext.registerFunction("day", lambda x: x.day)
    sqlContext.registerDataFrameAsTable(df, "df")
    sqlContext.sql("SELECT *, day(date_time) as day FROM df")
    

    最后你可以这样定义udf:

    from pyspark.sql.functions import udf
    from pyspark.sql.types import IntegerType
    
    day = udf(lambda date_time: date_time.day, IntegerType())
    df.withColumn("day", day(df.date_time))
    

    编辑

    实际上,如果您使用原始 SQL,day 函数已经定义(至少在 Spark 1.4 中),因此您可以省略 udf 注册。它还提供了许多不同的日期处理功能,包括:

    也可以使用简单的日期表达式,例如:

    current_timestamp() - expr("INTERVAL 1 HOUR")
    

    这意味着您可以构建相对复杂的查询,而无需将数据传递给 Python。例如:

    df =  sc.parallelize([
        (1, "2016-01-06 00:04:21"),
        (2, "2016-05-01 12:20:00"),
        (3, "2016-08-06 00:04:21")
    ]).toDF(["id", "ts_"])
    
    now = lit("2016-06-01 00:00:00").cast("timestamp") 
    five_months_ago = now - expr("INTERVAL 5 MONTHS")
    
    (df
        # Cast string to timestamp
        # For Spark 1.5 use cast("double").cast("timestamp")
        .withColumn("ts", unix_timestamp("ts_").cast("timestamp"))
        # Find all events in the last five months
        .where(col("ts").between(five_months_ago, now))
        # Find first Sunday after the event
        .withColumn("next_sunday", next_day(col("ts"), "Sun"))
        # Compute difference in days
        .withColumn("diff", datediff(col("ts"), col("next_sunday"))))
    

    【讨论】:

    • 栏目很多,我只想多加一栏。 map 方法可能过于繁琐,无法列出所有现有列。我将尝试注册功能的方式。谢谢。
    • 您不必列出地图中的所有现有列。可以简单地重新创建该行。我已经更新了答案以反映这一点。这种方法有两个问题。它返回 RDD of Rows 而不是 DataFrame,它很可能比优化的 SQL 慢。
    • 定义 udf 似乎是迄今为止我发现的最干净的方式。添加到答案中。
    【解决方案2】:

    res=df.withColumn("dayofts",dayofmonth("ts_"))
    from pyspark.sql import functions as F
    res=df.withColumn("dayofts",F.dayofmonth("ts_"))
    res.show()
    

    【讨论】:

      猜你喜欢
      • 2018-09-24
      • 1970-01-01
      • 1970-01-01
      • 2018-09-01
      • 2016-02-14
      • 1970-01-01
      • 1970-01-01
      • 2018-09-14
      相关资源
      最近更新 更多