【问题标题】:Spark: Iterating through columns in each row to create a new dataframeSpark:遍历每行中的列以创建新的数据框
【发布时间】:2019-12-16 15:30:43
【问题描述】:

假设我有一个这样的数据框:

+-----------+-----------+-----------+-----------+------------+--+
|   ColA    |   ColB    |   ColC    |   ColD    |    ColE    |  |
+-----------+-----------+-----------+-----------+------------+--+
| ''        | sample_1x | sample_1y | ''        | sample_1z  |  |
| sample2_x | sample2_y | ''        | ''        | ''         |  |
| sample3_x | ''        | ''        | ''        | sample3_y  |  |
| sample4_x | sample4_y | ''        | sample4_z | sample4_zz |  |
| sample5_x | ''        | ''        | ''        | ''         |  |
+-----------+-----------+-----------+-----------+------------+--+

我想创建另一个数据框,在每一行中显示从左到右的关系,同时跳过具有空值的列。也将排除只有 1 个有效的列式记录的行。例如:

+-----------+------------+-----------+
|   From    |     To     |   Label   |
+-----------+------------+-----------+
| sample1_x | sample1_y  | ColB_ColC |
| sample1_y | sample1_z  | ColC_ColE |
| sample2_x | sample2_y  | ColA_ColB |
| sample3_x | sample3_y  | ColA_ColE |
| sample4_x | sample4_y  | ColA_ColB |
| sample4_y | sample4_z  | ColB_ColD |
| sample4_z | sample4_zz | ColD_ColE |
+-----------+------------+-----------+

我认为这种方法是编写一个包含此逻辑的 UDF,但我不完全确定如何返回一个全新的 DF,因为我习惯于 UDF 只是在同一个 DF 中创建另一列.或者,如果有另一个 spark 函数可以比创建 UDF 更容易处理这种情况?如果这很重要,请使用 pyspark。

【问题讨论】:

    标签: dataframe apache-spark pyspark apache-spark-sql user-defined-functions


    【解决方案1】:

    你可以使用 udf,它接受一个数组参数并返回一个结构数组,例如:

    from pyspark.sql import functions as F
    
    df.show()
    +---------+---------+---------+---------+----------+
    |     ColA|     ColB|     ColC|     ColD|      ColE|
    +---------+---------+---------+---------+----------+
    |     null|sample_1x|sample_1y|     null| sample_1z|
    |sample2_x|sample2_y|     null|     null|      null|
    |sample3_x|     null|     null|     null| sample3_y|
    |sample4_x|sample4_y|     null|sample4_z|sample4_zz|
    |sample5_x|     null|     null|     null|      null|
    +---------+---------+---------+---------+----------+
    
    # columns that get involved, will group them into an array using F.array(cols)
    cols = df.columns
    
    # defind function to convert array into array of structs
    def find_route(arr, cols):
        d = [ (cols[i],e) for i,e in enumerate(arr) if e is not None ]
        return [ {'From':d[i][1], 'To':d[i+1][1], 'Label':d[i][0]+'_'+d[i+1][0]} for i in range(len(d)-1) ]
    
    # set up the UDF and add cols as an extra argument
    udf_find_route = F.udf(lambda a: find_route(a, cols), 'array<struct<From:string,To:string,Label:string>>')
    
    # retrive the data from the array of structs after array-explode
    df.select(F.explode(udf_find_route(F.array(cols))).alias('c1')).select('c1.*').show()
    +---------+----------+---------+
    |     From|        To|    Label|
    +---------+----------+---------+
    |sample_1x| sample_1y|ColB_ColC|
    |sample_1y| sample_1z|ColC_ColE|
    |sample2_x| sample2_y|ColA_ColB|
    |sample3_x| sample3_y|ColA_ColE|
    |sample4_x| sample4_y|ColA_ColB|
    |sample4_y| sample4_z|ColB_ColD|
    |sample4_z|sample4_zz|ColD_ColE|
    +---------+----------+---------+
    

    【讨论】:

      【解决方案2】:

      主要使用Spark SQL:

      df.createOrReplaceTempView("df")
      cols_df = df.columns
      qry = " union ".join([f"""
      select {enum_cols[1]} as From,
      {cols_df[enum_cols[0] + 1]} as To,
      '{enum_cols[1]}{cols_df[enum_cols[0] + 1]}' as Label from df where {enum_cols[1]} <> '' and {cols_df[enum_cols[0] + 1]} <> ''""" 
                    for enum_cols in enumerate(cols_df) if enum_cols[0] < len(cols_df) - 1])
      final_df = spark.sql(qry)
      

      【讨论】:

      • 三引号查询之前应该有一个 f 吗?我得到一个错误,在删除它并运行之后,我得到一个 ParseException,它以 "\nno 可行的替代输入 'select {'(line 1, pos 7)\n\n== SQL ==\nselect {enum_cols[1]} 作为 From,\n--------^^^\n {cols_df[enum_cols[0] + 1]} 作为 To, '{enum_cols[1]}{cols_df[enum_cols[0 ] + 1]}' 作为标签 where {enum_cols[1]} '' and {cols_df[enum_cols[0] + 1]} ''
      • 哦,我看到 f 使它成为格式化的字符串文字,嗯,尽管包含它,但我仍然遇到语法错误
      • f 用于字符串插值,我只是修改了答案,将两列值包装在 '' 中。它现在有效还是仍然出现错误?如果有,是哪一个?
      • 是的,指向三引号字符串末尾的语法错误
      • 尝试通过 pyspark 函数构造等效的 sql
      猜你喜欢
      • 1970-01-01
      • 2023-02-04
      • 1970-01-01
      • 2021-03-31
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-03-02
      • 2021-10-01
      相关资源
      最近更新 更多