【问题标题】:Looking for an inverse of pyspark's arrays_zip寻找 pyspark 的 arrays_zip 的逆
【发布时间】:2020-06-23 13:57:55
【问题描述】:

我有以下格式讨厌的输入数据框:

from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = SparkSession.builder.master("local").getOrCreate()

input_df = spark.createDataFrame(
    [
        ('Alice;Bob;Carol',),
        ('12;13;14',),
        ('5;;7',),
        ('1;;3',),
        (';;3',)
    ],
    ['data']
)
  
input_df.show()

# +---------------+
# |           data|
# +---------------+
# |Alice;Bob;Carol|
# |       12;13;14|
# |           5;;7|
# |           1;;3|
# |            ;;3|
# +---------------+

实际输入是以分号分隔的 CSV 文件,其中一列包含一个人的值。每个人可以有不同数量的值。这里,Alice 有 3 个值,Bob 只有一个,Carol 有 4 个值。

我想在 PySpark 中将其转换为一个输出数据框,该数据框为每个人保存一个数组,在此示例中,输出为:

result = spark.createDataFrame(
    [
        ("Alice", [12, 5, 1]),
        ("Bob", [13,]),
        ("Carol", [14, 7, 3, 3])
    ],
    ['name', 'values']
)

result.show()

# +-----+-------------+
# | name|       values|
# +-----+-------------+
# |Alice|   [12, 5, 1]|
# |  Bob|         [13]|
# |Carol|[14, 7, 3, 3]|
# +-----+-------------+

我该怎么做?我想这将是F.arrays_zip()F.split() 和/或F.explode() 的某种组合,但我想不通。

我目前被困在这里,这是我目前的尝试:

(input_df
    .withColumn('splits', F.split(F.col('data'), ';'))
    .drop('data')
).show()

# +-------------------+
# |             splits|
# +-------------------+
# |[Alice, Bob, Carol]|
# |       [12, 13, 14]|
# |           [5, , 7]|
# |           [1, , 3]|
# |            [, , 3]|
# +-------------------+

【问题讨论】:

    标签: python apache-spark pyspark pyspark-dataframes


    【解决方案1】:

    一种方法可以是读取第一行作为标题,然后取消透视数据

    df1 = spark.createDataFrame([(12,13,14),(5,None,7),(1,None,3),(None,None,3)], ['Alice','Bob','Carol'])
    
    df1.show()
    +-----+----+-----+
    |Alice| Bob|Carol|
    +-----+----+-----+
    |   12|  13|   14|
    |    5|null|    7|
    |    1|null|    3|
    | null|null|    3|
    +-----+----+-----+
    
    df1.select(f.expr('''stack(3,'Alice',Alice,'Bob',Bob,'Carol',Carol) as (Name,Value)'''))\
       .groupBy('Name').agg(f.collect_list('value').alias('Value')).orderBy('Name').show()
    
    +-----+-------------+
    | Name|        Value|
    +-----+-------------+
    |Alice|   [12, 5, 1]|
    |  Bob|         [13]|
    |Carol|[14, 7, 3, 3]|
    +-----+-------------+
    
    

    要动态传递列,请使用以下代码

    cols = ','.join([f"'{i[0]}',{i[1]}" for i in zip(df1.columns,df1.columns)])
    df1.select(f.expr(f'''stack(3,{cols}) as (Name,Value)''')).groupBy('Name').agg(f.collect_list('value').alias('Value')).orderBy('Name').show()
    
    +-----+-------------+
    | Name|        Value|
    +-----+-------------+
    |Alice|   [12, 5, 1]|
    |  Bob|         [13]|
    |Carol|[14, 7, 3, 3]|
    +-----+-------------+
    

    【讨论】:

      【解决方案2】:

      Solution for Spark-2.4+:

      使用 groupBy 将所有行合并为一行,使用 collect_list 然后拆分以创建新列。

      • 使用 arrays_zip 压缩数组并创建嵌套数组[key,[values]]
      • 最后explode嵌套数组。

      Example:

      df.show()
      #+---------------+
      #|           data|
      #+---------------+
      #|Alice;Bob;Carol|
      #|       12;13;14|
      #|           5;;7|
      #|           1;;3|
      #|            ;;3|
      #+---------------+
      from pyspark.sql.functions import *
      
      df.agg(split(concat_ws("|",collect_list(col("data"))),"\\|").alias("tmp")).\
      withColumn("col1",split(element_at(col("tmp"),1),";")).\
      withColumn("col2",split(element_at(col("tmp"),2),";")).\
      withColumn("col3",split(element_at(col("tmp"),3),";")).\
      withColumn("col4",split(element_at(col("tmp"),4),";")).\
      withColumn("zip",arrays_zip(col("col1"),arrays_zip(col("col2"),col("col3"),col("col4")))).\
      selectExpr("explode(zip)as tmp").\
      selectExpr("tmp.*").\
      toDF("name","values").\
      show(10,False)
      
      #+-----+----------+
      #|name |values    |
      #+-----+----------+
      #|Alice|[12, 5, 1]|
      #|Bob  |[13, , ]  |
      #|Carol|[14, 7, 3]|
      #+-----+----------+
      

      对于 spark < 2.4 使用 udf 作为 arrays_zip 并使用 getItem(<n>) 而不是 element_at 函数。

      【讨论】:

      【解决方案3】:

      我建议将数据读取为; separeted csv,然后处理以获取namevalues 列,如下所示-

      请注意,此代码是用 scala 编写的,但类似的代码可以在 pyspark 中实现,只需很少的改动

      加载;分隔的csv

         val data =
            """
              |Alice;Bob;Carol
              |       12;13;14
              |           5;;7
              |           1;;3
              |            ;;3
            """.stripMargin
          val stringDS = data.split(System.lineSeparator())
            .map(_.split("\\;").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(";"))
            .toSeq.toDS()
          val df = spark.read
            .option("sep", ";")
            .option("inferSchema", "true")
            .option("header", "true")
            .option("nullValue", "null")
            .csv(stringDS)
          df.printSchema()
          df.show(false)
          /**
            * root
            * |-- Alice: integer (nullable = true)
            * |-- Bob: integer (nullable = true)
            * |-- Carol: integer (nullable = true)
            *
            * +-----+----+-----+
            * |Alice|Bob |Carol|
            * +-----+----+-----+
            * |12   |13  |14   |
            * |5    |null|7    |
            * |1    |null|3    |
            * |null |null|3    |
            * +-----+----+-----+
            */
      

      导出namevalues

      
          val columns = df.columns.map(c => expr(s"named_struct('name', '$c', 'values',  collect_list($c))"))
          df.select(array(columns: _*).as("array"))
            .selectExpr("inline_outer(array)")
            .show(false)
          /**
            * +-----+-------------+
            * |name |values       |
            * +-----+-------------+
            * |Alice|[12, 5, 1]   |
            * |Bob  |[13]         |
            * |Carol|[14, 7, 3, 3]|
            * +-----+-------------+
            */
      

      【讨论】:

        猜你喜欢
        • 2013-10-08
        • 1970-01-01
        • 2013-07-26
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2019-06-27
        • 2011-02-19
        相关资源
        最近更新 更多