【问题标题】:How to split a list to multiple columns in Pyspark?如何在 Pyspark 中将列表拆分为多列?
【发布时间】:2018-01-29 01:53:22
【问题描述】:

我有:

key   value
a    [1,2,3]
b    [2,3,4]

我想要:

key value1 value2 value3
a     1      2      3
b     2      3      4

似乎在scala中我可以写:df.select($"value._1", $"value._2", $"value._3"),但在python中是不可能的。

那么有什么好办法吗?

【问题讨论】:

    标签: apache-spark pyspark apache-spark-sql


    【解决方案1】:

    这取决于你的“列表”的类型:

    • 如果是ArrayType()类型:

      df = hc.createDataFrame(sc.parallelize([['a', [1,2,3]], ['b', [2,3,4]]]), ["key", "value"])
      df.printSchema()
      df.show()
      root
       |-- key: string (nullable = true)
       |-- value: array (nullable = true)
       |    |-- element: long (containsNull = true)
      

      您可以像使用 python 一样使用 [] 访问这些值:

      df.select("key", df.value[0], df.value[1], df.value[2]).show()
      +---+--------+--------+--------+
      |key|value[0]|value[1]|value[2]|
      +---+--------+--------+--------+
      |  a|       1|       2|       3|
      |  b|       2|       3|       4|
      +---+--------+--------+--------+
      
      +---+-------+
      |key|  value|
      +---+-------+
      |  a|[1,2,3]|
      |  b|[2,3,4]|
      +---+-------+
      
    • 如果它是 StructType() 类型:(也许你通过读取 JSON 来构建你的数据框)

      df2 = df.select("key", psf.struct(
              df.value[0].alias("value1"), 
              df.value[1].alias("value2"), 
              df.value[2].alias("value3")
          ).alias("value"))
      df2.printSchema()
      df2.show()
      root
       |-- key: string (nullable = true)
       |-- value: struct (nullable = false)
       |    |-- value1: long (nullable = true)
       |    |-- value2: long (nullable = true)
       |    |-- value3: long (nullable = true)
      
      +---+-------+
      |key|  value|
      +---+-------+
      |  a|[1,2,3]|
      |  b|[2,3,4]|
      +---+-------+
      

      您可以使用* 直接“拆分”列:

      df2.select('key', 'value.*').show()
      +---+------+------+------+
      |key|value1|value2|value3|
      +---+------+------+------+
      |  a|     1|     2|     3|
      |  b|     2|     3|     4|
      +---+------+------+------+
      

    【讨论】:

    • 使用*拆分StructType的列时可以重命名列吗?
    • 添加到答案,对于 arraytype 动态执行,您可以执行类似 df2.select(['key'] + [df2.features[x] for x in range(0,3 )])
    【解决方案2】:

    我想将大小列表(数组)的情况添加到 pault 答案中。

    如果我们的列包含中等大小的数组(或大型数组),仍然可以将它们拆分为列。

    from pyspark.sql.types import *          # Needed to define DataFrame Schema.
    from pyspark.sql.functions import expr   
    
    # Define schema to create DataFrame with an array typed column.
    mySchema = StructType([StructField("V1", StringType(), True),
                           StructField("V2", ArrayType(IntegerType(),True))])
    
    df = spark.createDataFrame([['A', [1, 2, 3, 4, 5, 6, 7]], 
                                ['B', [8, 7, 6, 5, 4, 3, 2]]], schema= mySchema)
    
    # Split list into columns using 'expr()' in a comprehension list.
    arr_size = 7
    df = df.select(['V1', 'V2']+[expr('V2[' + str(x) + ']') for x in range(0, arr_size)])
    
    # It is posible to define new column names.
    new_colnames = ['V1', 'V2'] + ['val_' + str(i) for i in range(0, arr_size)] 
    df = df.toDF(*new_colnames)
    

    结果是:

    df.show(truncate= False)
    
    +---+---------------------+-----+-----+-----+-----+-----+-----+-----+
    |V1 |V2                   |val_0|val_1|val_2|val_3|val_4|val_5|val_6|
    +---+---------------------+-----+-----+-----+-----+-----+-----+-----+
    |A  |[1, 2, 3, 4, 5, 6, 7]|1    |2    |3    |4    |5    |6    |7    |
    |B  |[8, 7, 6, 5, 4, 3, 2]|8    |7    |6    |5    |4    |3    |2    |
    +---+---------------------+-----+-----+-----+-----+-----+-----+-----+
    

    【讨论】:

      【解决方案3】:

      @jordi Aceiton 感谢您的解决方案。 我试图使它更简洁,试图删除用于重命名新创建的列名的循环,在创建列时这样做。 使用 df.columns 获取所有列名,而不是手动创建。

          from pyspark.sql.types import *          
          from pyspark.sql.functions import * 
          from pyspark import Row
      
          df = spark.createDataFrame([Row(index=1, finalArray = [1.1,2.3,7.5], c =4),Row(index=2, finalArray = [9.6,4.1,5.4], c= 4)])
          #collecting all the column names as list
          dlist = df.columns
          #Appending new columns to the dataframe
          df.select(dlist+[(col("finalArray")[x]).alias("Value"+str(x+1)) for x in range(0, 3)]).show()
      

      输出:

           +---------------+-----+------+------+------+
           |  finalArray   |index|Value1|Value2|Value3|
           +---------------+-----+------+------+------+
           |[1.1, 2.3, 7.5]|  1  |   1.1|   2.3|   7.5|
           |[9.6, 4.1, 5.4]|  2  |   9.6|   4.1|   5.4|
           +---------------+-----+------+------+------+
      

      【讨论】:

      • NameError: name 'col' is not defined
      【解决方案4】:

      我需要将 712 维数组取消列出到列中,以便将其写入 csv。我首先使用@MaFF 的解决方案来解决我的问题,但这似乎会导致很多错误和额外的计算时间。我不确定是什么原因造成的,但我使用了一种不同的方法,大大减少了计算时间(22 分钟与 4 多个小时相比)!

      @MaFF 的方法:

      length = len(dataset.head()["list_col"])
      dataset = dataset.select(dataset.columns + [dataset["list_col"][k] for k in range(length)])
      

      我用了什么:

      dataset = dataset.rdd.map(lambda x: (*x, *x["list_col"])).toDF()
      

      如果有人知道导致计算时间差异的原因,请告诉我!我怀疑在我的情况下,瓶颈是调用head() 来获取列表长度(我希望它是自适应的)。而且因为 (i) 我的数据管道非常长且详尽,并且 (ii) 我不得不取消列出多个列。此外,缓存整个数据集不是一种选择。

      【讨论】:

        【解决方案5】:

        对于 arraytype 数据,要动态执行,您可以执行类似的操作

        df2.select(['key'] + [df2.features[x] for x in range(0,3)])
        

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 1970-01-01
          • 2021-06-30
          • 2020-11-10
          • 2018-10-07
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2017-04-22
          相关资源
          最近更新 更多