【问题标题】:Pyspark - Shift column values based on other column valuePyspark - 根据其他列值移动列值
【发布时间】:2019-05-04 05:44:58
【问题描述】:

我需要根据数据框中的其他列向左移动列。请注意我使用的是 spark 2.1

ID  Col1 Col2 Col3 Col4 shift
1    1    2    3    4     1
2    5    6    7    8     3
3    9    10   11   12    2
4    13   14   15   16    0
5    17   18   19   20    5

预期输出:

ID  Col1 Col2 Col3 Col4
1    2    3     4   1
2    8    5     6   7
3    11   12    9   10
4    13   14    15  16
5    18   19    20  17

【问题讨论】:

    标签: pyspark


    【解决方案1】:

    您可以尝试自定义您的数组移位和旋转功能,然后使用F.udf():

    from pyspark.sql import functions as F
    from pyspark.sql.types import ArrayType, LongType
    
    df = spark.createDataFrame(
        [(1,1,2,3,4,1),(2,5,6,7,8,3),(3,9,10,11,12,2),(4,13,14,15,16,0),(5,17,18,19,20,5)]
      , ['Id','Col1','Col2','Col3','Col4','shift']
    )
    
    df.printSchema()
    #root
    # |-- Id: long (nullable = true)
    # |-- Col1: long (nullable = true)
    # |-- Col2: long (nullable = true)
    # |-- Col3: long (nullable = true)
    # |-- Col4: long (nullable = true)
    # |-- shift: long (nullable = true)
    
    # colume names to shift/rotate
    cols = df.columns[1:-1]
    #['Col1', 'Col2', 'Col3', 'Col4']
    
    #@F.udf("array<long>")
    def my_shift(arr, n):
        if n == 0: return arr
        arr_len = len(arr)
        return [ arr[(i+n)%arr_len] for i in range(arr_len) ]
    
    shift_udf = F.udf(my_shift, ArrayType(LongType()))
    
    # group the cols into an array and then run shift_udf(arr, n) to form 'new_arr' 
    df_new = (df.withColumn('arr', F.array([ F.col(c) for c in cols ]))
                .withColumn('new_arr', shift_udf('arr', 'shift'))          
                .select('ID', 'shift', 'arr', 'new_arr', *[ F.col('new_arr')[i].alias(cols[i]) for i in range(len(cols)) ])
             )          
    
    df_new.show()                                                                                                      
    #+---+-----+----------------+----------------+----+----+----+----+
    #| ID|shift|             arr|         new_arr|Col1|Col2|Col3|Col4|
    #+---+-----+----------------+----------------+----+----+----+----+
    #|  1|    1|    [1, 2, 3, 4]|    [2, 3, 4, 1]|   2|   3|   4|   1|
    #|  2|    3|    [5, 6, 7, 8]|    [8, 5, 6, 7]|   8|   5|   6|   7|
    #|  3|    2| [9, 10, 11, 12]| [11, 12, 9, 10]|  11|  12|   9|  10|
    #|  4|    0|[13, 14, 15, 16]|[13, 14, 15, 16]|  13|  14|  15|  16|
    #|  5|    5|[17, 18, 19, 20]|[18, 19, 20, 17]|  18|  19|  20|  17|
    #+---+-----+----------------+----------------+----+----+----+----+
    

    【讨论】:

    • @VivekReddy,从 1.3 版本开始支持 F.udf() 函数。我将 udf 的定义方式从使用简化的 returnType 调整为更正式的格式。你可以检查它是否有效。
    • @VivekReddy 还要确保为您的数据导入正确的类型。我在示例中使用LongType。即from pyspark.sql.types import ArrayType, LongType.
    【解决方案2】:

    尝试以下:

    
    from collections import deque
    
    def shift(row):
        l = list(row)[:-1]
        d = deque(l)
        s = row.shift
        d.rotate(s)
        return list(d)
    
    df.rdd.map(lambda x: shift(x)).toDF(df.columns[:-1])
    

    【讨论】:

    • 该解决方案对我不起作用。我不断收到“班次不在列表中”错误
    • 那么你有一个名为'shift'的列吗?
    猜你喜欢
    • 1970-01-01
    • 2016-11-30
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2022-12-21
    • 2020-04-13
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多