【问题标题】:Windowing Data into Rows in Pyspark在 Pyspark 中将数据窗口化为行
【发布时间】:2020-12-30 00:28:23
【问题描述】:

我正在准备一个数据集来开发一个监督模型来预测一个给定之前 5 个先前值的值。例如,给定下面的示例数据,我会预测给定列 1:5 的第 6 列,或给定列 3:7 的第 8 列。

id    1   2   3   4   5   6   7   8   9   10  11  12  13  14  15  16 ...
a    150 110 130  80 136 150 190 110 150 110 130 136 100 150 190 110
b    100 100 130 100 136 100 160 230 122 130  15 200 100 100 136 100
c    130 122 140 140 122 130  15 200 100 100 130 100 136 100 160 230

为此,我想将上面的示例数据重新组织为 6 列的行,并尽可能采用 6 个值的每个切片/窗口(例如 1:6、2:7、3:8)。我怎样才能做到这一点?在 PySpark/SQL 中可能吗?下面的输出示例,索引只是为了澄清:

           1   2   3   4   5   6   
a[1:6]    150 110 130  80 136 150 
a[2:7]    110 130  80 136 150 190
a[3:8]    130  80 136 150 190 110
...
c[1:6]    130 122 140 140 122 130
c[2:7]    122 140 140 122 130  15
...
c[10:16]  130 100 136 100 160 230

【问题讨论】:

    标签: apache-spark pyspark apache-spark-sql


    【解决方案1】:

    您可以将列转换为数组数组结构数组,然后展开,例如:

    from pyspark.sql.functions import struct, explode, array, col
    
    # all columns except the first
    cols = df.columns[1:]
    
    # size of the splits
    N = 6
    

    使用数组数组:

    df_new = df.withColumn('dta', explode(array(*[ array(*cols[i:i+N]) for i in range(len(cols)-N+1) ]))) \
        .select('id', *[ col('dta')[i].alias(str(i+1)) for i in range(N) ])
    df_new.show()
    +---+---+---+---+---+---+---+
    | id|  1|  2|  3|  4|  5|  6|
    +---+---+---+---+---+---+---+
    |  a|150|110|130| 80|136|150|
    |  a|110|130| 80|136|150|190|
    |  a|130| 80|136|150|190|110|
    |  a| 80|136|150|190|110|150|
    |  a|136|150|190|110|150|110|
    |  a|150|190|110|150|110|130|
    |  a|190|110|150|110|130|136|
    |  a|110|150|110|130|136|100|
    |  a|150|110|130|136|100|150|
    |  a|110|130|136|100|150|190|
    |  a|130|136|100|150|190|110|
    |  b|100|100|130|100|136|100|
    +---+---+---+---+---+---+---+
    

    使用结构数组(spark 2.4+):

    df_new = df.withColumn('dta', array(*cols)) \
        .selectExpr("id", f"""
          inline(transform(sequence(0,{len(cols)-N}), i -> ({','.join(f'dta[i+{j}] as `{j+1}`' for j in range(N))})))
        """)
    

    上面 f-string 中的代码与下面的代码相同,对于 N=6:

    inline(transform(sequence(0,10), i -> struct(dta[i] as `1`, dta[i+1] as `2`, dta[i+2] as `3`, dta[i+3] as `4`, dta[i+4] as `5`, dta[i+5] as `6`)))
    

    【讨论】:

      【解决方案2】:

      是的,您可以使用此代码(并对其进行修改以获得您需要的内容):

      partitions = []
      for row in df.rdd.toLocalIterator():
          row_list = list(row)
          num_elements = 6
      
          for i in range(0, len(row_list) - num_elements):
              partition = row[i : i+num_elements]
              partitions.append(partition)
      
      output_df = spark.createDataFrame(partitions)
      

      【讨论】:

        猜你喜欢
        • 2018-07-06
        • 1970-01-01
        • 2019-04-25
        • 2016-05-10
        • 2021-04-24
        • 2019-02-07
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多