【问题标题】:Time filtering a struct column in Pyspark dataframe时间过滤 Pyspark 数据框中的结构列
【发布时间】:2021-06-27 20:14:34
【问题描述】:

我有一个数据框,其中的列的结构具有日期和值,因此架构看起来像

root
 |-- col1: struct (nullable = true)
 |    |-- dates: array (nullable = true)
 |    |    |-- element: timestamp (containsNull = true)
 |    |-- values: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |-- col2: struct (nullable = true)
 |    |-- dates: array (nullable = true)
 |    |    |-- element: timestamp (containsNull = true)
 |    |-- values: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |-- id: string (nullable = true)

给定一些时间索引:

time_index = datetime.datetime(2015, 12, 12, 4, 45)

以及前后天数:

min_diff = -1 and max_diff = 2

我想要新的列 col1_filtcol2_filt,它们具有相同的结构来返回落在由 time_indexmin_diffmax_diff 定义的窗口内的日期以及相应的值。如果没有日期或值落在该窗口内,我希望它返回 None

下面是一个可以使用的示例 DataFrame。

示例数据框:

example_input = [
    Row(
        id = "A", 
        col1 = Row(
            dates = [datetime.datetime(2015, 12, 11, 5, 28), datetime.datetime(2015, 12, 12, 4, 45), datetime.datetime(2015, 12, 13, 5, 9)], 
            values = [17.7, 19.1, 19.1]
        ),
        col2 = Row(
            dates = [datetime.datetime(2015, 12, 13, 4, 48), datetime.datetime(2015, 12, 15, 5, 8)], 
            values = [19.1, 19.1]
        )
    ),
    Row(
        id = "B", 
        col1 = Row(
            dates = [datetime.datetime(2017, 1, 13, 5, 9)], 
            values = [19.1]
        ),
        col2 = Row(
            dates = [datetime.datetime(2017, 1, 12, 2, 48), datetime.datetime(2017, 1, 15, 5, 8)], 
            values = [19.5, 29.1]
        )
    ),
]

df = spark.createDataFrame(example_input)

显示df:

+-------------------------------------------------------------------------------------+----------------------------------------------------------+---+
|col1                                                                                 |col2                                                      |id |
+-------------------------------------------------------------------------------------+----------------------------------------------------------+---+
|[[2015-12-11 05:28:00, 2015-12-12 04:45:00, 2015-12-13 05:09:00], [17.7, 19.1, 19.1]]|[[2015-12-13 04:48:00, 2015-12-15 05:08:00], [19.1, 19.1]]|A  |
|[[2017-01-13 05:09:00], [19.1]]                                                      |[[2017-01-12 02:48:00, 2017-01-15 05:08:00], [19.5, 29.1]]|B  |
+-------------------------------------------------------------------------------------+----------------------------------------------------------+---+

我有一些代码将采用 Pyspark 行对象并返回过滤后的 Pyspark 行对象,但我不确定如何将其设为 udf。

【问题讨论】:

    标签: python apache-spark filter pyspark user-defined-functions


    【解决方案1】:

    以下是使用 UDF 进行过滤的示例:

    import datetime
    import pyspark.sql.functions as F
    from pyspark.sql import Row
    
    time_index = datetime.datetime(2015, 12, 12, 4, 45)
    min_diff = -1
    max_diff = 2
    
    def time_filter(r):
        ret = list(zip(*[
            x for x in list(zip(r['dates'], r['values'])) 
            if x[0] > time_index + datetime.timedelta(days=min_diff) 
            and x[0] < time_index + datetime.timedelta(days=max_diff)
        ]))
        return Row(dates=ret[0], values=ret[1]) if len(ret) != 0 else None
    
    time_filter_udf = F.udf(time_filter, 'struct<dates:array<timestamp>,values:array<double>>')
    
    df2 = df.withColumn('col1_filt', time_filter_udf('col1')).withColumn('col2_filt', time_filter_udf('col2'))
    
    df2.show(truncate=False)
    +-------------------------------------------------------------------------------------+----------------------------------------------------------+---+-------------------------------------------------------------------------------------+-------------------------------+
    |col1                                                                                 |col2                                                      |id |col1_filt                                                                            |col2_filt                      |
    +-------------------------------------------------------------------------------------+----------------------------------------------------------+---+-------------------------------------------------------------------------------------+-------------------------------+
    |[[2015-12-11 05:28:00, 2015-12-12 04:45:00, 2015-12-13 05:09:00], [17.7, 19.1, 19.1]]|[[2015-12-13 04:48:00, 2015-12-15 05:08:00], [19.1, 19.1]]|A  |[[2015-12-11 05:28:00, 2015-12-12 04:45:00, 2015-12-13 05:09:00], [17.7, 19.1, 19.1]]|[[2015-12-13 04:48:00], [19.1]]|
    |[[2017-01-13 05:09:00], [19.1]]                                                      |[[2017-01-12 02:48:00, 2017-01-15 05:08:00], [19.5, 29.1]]|B  |null                                                                                 |null                           |
    +-------------------------------------------------------------------------------------+----------------------------------------------------------+---+-------------------------------------------------------------------------------------+-------------------------------+
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2020-08-19
      • 2019-03-07
      • 1970-01-01
      • 2020-09-22
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-06-28
      相关资源
      最近更新 更多