【问题标题】:Pyspark Replicate Row based on column valuePyspark 根据列值复制行
【发布时间】:2018-12-09 01:53:20
【问题描述】:

我想根据每行上给定列的值复制我的 DataFrame 中的所有行,然后索引每个新行。假设我有:

Column A Column B
T1       3
T2       2

我希望结果是:

Column A Column B Index
T1       3        1
T1       3        2
T1       3        3
T2       2        1
T2       2        2

我能够使用固定值进行类似的操作,但不能使用列上的信息。我当前的固定值工作代码是:

idx = [lit(i) for i in range(1, 10)]
df = df.withColumn('Index', explode(array( idx ) ))

我试图改变:

lit(i) for i in range(1, 10) 

lit(i) for i in range(1, df['Column B'])

并将其添加到我的 array() 函数中:

df = df.withColumn('Index', explode(array( lit(i) for i in range(1, df['Column B']) ) ))

但它不起作用(TypeError: 'Column' 对象不能被解释为整数)。

我应该如何实现这个?

【问题讨论】:

    标签: apache-spark pyspark apache-spark-sql pyspark-sql


    【解决方案1】:

    很遗憾,您不能像那样iterate over a Column。您始终可以使用udf,但如果您使用的是 Spark 2.1 或更高版本,我确实有一个非 udf hack 解决方案应该适合您。

    诀窍是利用pyspark.sql.functions.posexplode() 来获取索引值。我们通过重复逗号Column B 次来创建一个字符串。然后我们用逗号分割这个字符串,用posexplode得到索引。

    df.createOrReplaceTempView("df")  # first register the DataFrame as a temp table
    
    query = 'SELECT '\
        '`Column A`,'\
        '`Column B`,'\
        'pos AS Index '\
        'FROM ( '\
            'SELECT DISTINCT '\
            '`Column A`,'\
            '`Column B`,'\
            'posexplode(split(repeat(",", `Column B`), ",")) '\
            'FROM df) AS a '\
        'WHERE a.pos > 0'
    newDF = sqlCtx.sql(query).sort("Column A", "Column B", "Index")
    newDF.show()
    #+--------+--------+-----+
    #|Column A|Column B|Index|
    #+--------+--------+-----+
    #|      T1|       3|    1|
    #|      T1|       3|    2|
    #|      T1|       3|    3|
    #|      T2|       2|    1|
    #|      T2|       2|    2|
    #+--------+--------+-----+
    

    注意:您需要将列名包含在反引号中,因为它们中有空格,如本文所述:How to express a column which name contains spaces in Spark SQL

    【讨论】:

    • 伟大的黑客攻击@pault。但我有一个查询,重复中的Column B 如何被视为原语而不是 sql 表达式中的列。在 api 形式中它被视为列,不是吗?
    • @Ramesh 我在使用 DataFrame 函数时遇到了麻烦。我不知道为什么它在查询中起作用。
    • @RameshMaharjan 我发布了一个关于此行为的question
    【解决方案2】:
    You can try this:
    
        from pyspark.sql.window import Window
        from pyspark.sql.functions import *
        from pyspark.sql.types import ArrayType, IntegerType
        from pyspark.sql import functions as F
        df = spark.read.csv('/FileStore/tables/stack1.csv', header = 'True', inferSchema = 'True')
    
        w = Window.orderBy("Column A")
        df = df.select(row_number().over(w).alias("Index"), col("*"))
    
        n_to_array = udf(lambda n : [n] * n ,ArrayType(IntegerType()))
        df2 = df.withColumn('Column B', n_to_array('Column B'))
        df3= df2.withColumn('Column B', explode('Column B'))
        df3.show()
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-11-20
      • 1970-01-01
      • 2019-09-19
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多