【问题标题】:Column filtering in PySparkPySpark 中的列过滤
【发布时间】:2015-07-14 07:19:51
【问题描述】:

我有一个从 Hive 表加载的数据框df,它有一个时间戳列,比如ts,格式为dd-MMM-yy hh.mm.ss.MS a 的字符串类型(转换为python 日期时间库,这是%d-%b-%y %I.%M.%S.%f %p)。

现在我想从数据框中筛选出过去五分钟的行:

only_last_5_minutes = df.filter(
    datetime.strptime(df.ts, '%d-%b-%y %I.%M.%S.%f %p') > datetime.now() - timedelta(minutes=5)
)

但是,这不起作用,我收到此消息

TypeError: strptime() argument 1 must be string, not Column

看起来我对列操作的应用有误,在我看来,我必须创建一个 lambda 函数来过滤满足所需条件的每一列,但作为 Python 和 lambda 表达式的新手,我没有不知道如何正确创建我的过滤器。请指教。

附: 我更喜欢将我的过滤器表示为 Python 本机(或 SparkSQL),而不是 Hive sql 查询表达式“WHERE”中的过滤器。

首选:

df = sqlContext.sql("SELECT * FROM my_table")
df.filter( // filter here)

不喜欢:

df = sqlContext.sql("SELECT * FROM my_table WHERE...")

【问题讨论】:

    标签: python lambda apache-spark apache-spark-sql pyspark


    【解决方案1】:

    火花 >= 1.5

    从 Spark 1.5 开始,您可以按如下方式解析日期字符串:

    from pyspark.sql.functions import expr, from_unixtime, lit, unix_timestamp
    from pyspark.sql.types import TimestampType
    
    parsed_df = df.select((from_unixtime(unix_timestamp(
        # Note: am-pm: pattern length should be 1 for Spark >= 3.0
        df.datetime, "dd-MMM-yy h.mm.ss.SSSSSS a"  
    ))).cast(TimestampType()).alias("datetime"))
    
    parsed_df.where(col("datetime") >= lit(now) - expr("INTERVAL 5 minutes"))
    

    然后应用间隔:

    from pyspark.sql.functions import current_timestamp, expr
    

    火花

    可以使用用户自定义函数。

    from datetime import datetime, timedelta
    from pyspark.sql.types import BooleanType, TimestampType
    from pyspark.sql.functions import udf, col
    
    def in_last_5_minutes(now):
        def _in_last_5_minutes(then):
            then_parsed = datetime.strptime(then, '%d-%b-%y %I.%M.%S.%f %p')
            return then_parsed > now - timedelta(minutes=5)
        return udf(_in_last_5_minutes, BooleanType())
    

    使用一些虚拟数据:

    df = sqlContext.createDataFrame([
        (1, '14-Jul-15 11.34.29.000000 AM'),
        (2, '14-Jul-15 11.34.27.000000 AM'),
        (3, '14-Jul-15 11.32.11.000000 AM'),
        (4, '14-Jul-15 11.29.00.000000 AM'),
        (5, '14-Jul-15 11.28.29.000000 AM')
    ], ('id', 'datetime'))
    
    now = datetime(2015, 7, 14, 11, 35)
    df.where(in_last_5_minutes(now)(col("datetime"))).show()
    

    正如预期的那样,我们只得到 3 个条目:

    +--+--------------------+
    |id|            datetime|
    +--+--------------------+
    | 1|14-Jul-15 11.34.2...|
    | 2|14-Jul-15 11.34.2...|
    | 3|14-Jul-15 11.32.1...|
    +--+--------------------+
    

    重新解析日期时间字符串效率相当低,因此您可以考虑存储TimestampType

    def parse_dt():
        def _parse(dt):
            return datetime.strptime(dt, '%d-%b-%y %I.%M.%S.%f %p')
        return udf(_parse, TimestampType())
    
    df_with_timestamp = df.withColumn("timestamp", parse_dt()(df.datetime))
    
    def in_last_5_minutes(now):
        def _in_last_5_minutes(then):
            return then > now - timedelta(minutes=5)
        return udf(_in_last_5_minutes, BooleanType())
    
    df_with_timestamp.where(in_last_5_minutes(now)(col("timestamp")))
    

    结果:

    +--+--------------------+--------------------+
    |id|            datetime|           timestamp|
    +--+--------------------+--------------------+
    | 1|14-Jul-15 11.34.2...|2015-07-14 11:34:...|
    | 2|14-Jul-15 11.34.2...|2015-07-14 11:34:...|
    | 3|14-Jul-15 11.32.1...|2015-07-14 11:32:...|
    +--+--------------------+--------------------+
    

    终于可以使用带有时间戳的原始 SQL 查询了:

    query = """SELECT * FROM df
         WHERE unix_timestamp(datetime, 'dd-MMM-yy HH.mm.ss.SSSSSS a') > {0}
         """.format(time.mktime((now - timedelta(minutes=5)).timetuple()))
    
    sqlContext.sql(query)
    

    同上,一次解析日期字符串会更高效。

    如果列已经是timestamp,则可以使用datetime 文字:

    from pyspark.sql.functions import lit
    
    df_with_timestamp.where(
        df_with_timestamp.timestamp > lit(now - timedelta(minutes=5)))
    

    【讨论】:

      【解决方案2】:
      from pyspark.sql.functions import *
      df.withColumn("seconds_from_now", current_timestamp() - col("ts").cast("long"))
      df = df.filter(df.seconds_from_now <= 5*60).drop("seconds_from_now")
      

      df 是包含最后五分钟结果的结果数据框。

      【讨论】:

        猜你喜欢
        • 2020-07-08
        • 2018-03-24
        • 1970-01-01
        • 2020-08-19
        • 1970-01-01
        • 1970-01-01
        • 2019-03-07
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多