【问题标题】:Spark Dataframe distinguish columns with duplicated nameSpark Dataframe 区分名称重复的列
【发布时间】:2016-02-20 02:33:14
【问题描述】:

据我所知,在 Spark Dataframe 中,多个列可以具有相同的名称,如下面的数据帧快照所示:

[
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=125231, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0047, 3: 0.0, 4: 0.0043})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=145831, f=SparseVector(5, {0: 0.0, 1: 0.2356, 2: 0.0036, 3: 0.0, 4: 0.4132})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=147031, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=149231, f=SparseVector(5, {0: 0.0, 1: 0.0032, 2: 0.2451, 3: 0.0, 4: 0.0042}))
]

上面的结果是通过将数据框连接到自身创建的,您可以看到4 列有两个af

问题是当我尝试使用a 列进行更多计算时,我找不到选择a 的方法,我尝试了df[0]df.select('a'),都返回了以下错误消息:

AnalysisException: Reference 'a' is ambiguous, could be: a#1333L, a#1335L.

Spark API 中是否有我可以再次将列与重复名称区分开来?或者也许可以让我更改列名?

【问题讨论】:

    标签: python apache-spark dataframe pyspark apache-spark-sql


    【解决方案1】:

    让我们从一些数据开始:

    from pyspark.mllib.linalg import SparseVector
    from pyspark.sql import Row
    
    df1 = sqlContext.createDataFrame([
        Row(a=107831, f=SparseVector(
            5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
        Row(a=125231, f=SparseVector(
            5, {0: 0.0, 1: 0.0, 2: 0.0047, 3: 0.0, 4: 0.0043})),
    ])
    
    df2 = sqlContext.createDataFrame([
        Row(a=107831, f=SparseVector(
            5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
        Row(a=107831, f=SparseVector(
            5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
    ])
    

    有几种方法可以解决这个问题。首先,您可以使用父列明确引用子表列:

    df1.join(df2, df1['a'] == df2['a']).select(df1['f']).show(2)
    
    ##  +--------------------+
    ##  |                   f|
    ##  +--------------------+
    ##  |(5,[0,1,2,3,4],[0...|
    ##  |(5,[0,1,2,3,4],[0...|
    ##  +--------------------+
    

    您还可以使用表别名:

    from pyspark.sql.functions import col
    
    df1_a = df1.alias("df1_a")
    df2_a = df2.alias("df2_a")
    
    df1_a.join(df2_a, col('df1_a.a') == col('df2_a.a')).select('df1_a.f').show(2)
    
    ##  +--------------------+
    ##  |                   f|
    ##  +--------------------+
    ##  |(5,[0,1,2,3,4],[0...|
    ##  |(5,[0,1,2,3,4],[0...|
    ##  +--------------------+
    

    最后您可以通过编程方式重命名列:

    df1_r = df1.select(*(col(x).alias(x + '_df1') for x in df1.columns))
    df2_r = df2.select(*(col(x).alias(x + '_df2') for x in df2.columns))
    
    df1_r.join(df2_r, col('a_df1') == col('a_df2')).select(col('f_df1')).show(2)
    
    ## +--------------------+
    ## |               f_df1|
    ## +--------------------+
    ## |(5,[0,1,2,3,4],[0...|
    ## |(5,[0,1,2,3,4],[0...|
    ## +--------------------+
    

    【讨论】:

    • 感谢您的编辑展示了在这些模棱两可的情况下获得正确列的多种方法,我认为您的示例应该进入 Spark 编程指南。我学到了很多东西!
    • 小修正:df2_r = **df2** .select(*(col(x).alias(x + '_df2') for x in df2.columns)) 而不是df2_r = df1.select(*(col(x).alias(x + '_df2') for x in df2.columns))。剩下的,好东西
    • 我同意这应该是 Spark 编程指南的一部分。纯金。在进行连接之前,我终于能够通过旧名称来解开歧义的来源。在连接所有歧义之前以编程方式将后缀附加到列名的解决方案。
    • @resec : 你明白为什么需要重命名df1_a = df1.alias("df1_a") 以及为什么我们不能直接使用df1df2 吗?这个答案没有解释为什么需要重命名才能使 select('df1_a.f') 工作
    • @Sheldore 它适用于原始问题,其中有一个表df 与其自身连接。如果解决方案写了df.alias("df1_a")df.alias("df2_a"),也许解决方案会更有意义。
    【解决方案2】:

    我建议您更改 join 的列名。

    df1.select(col("a") as "df1_a", col("f") as "df1_f")
       .join(df2.select(col("a") as "df2_a", col("f") as "df2_f"), col("df1_a" === col("df2_a"))
    

    生成的DataFrame 将有schema

    (df1_a, df1_f, df2_a, df2_f)
    

    【讨论】:

    • 您可能需要修正您的答案,因为在列名之间没有正确调整引号。
    • @SamehSharaf 我认为您是反对我的答案的人?但答案实际上是 100% 正确的——我只是使用 scala '-shorthand 进行列选择,所以引号实际上没有问题。
    • @GlennieHellesSindholt,公平点。这令人困惑,因为答案被标记为pythonpyspark
    • 如果每个数据框包含 100 多列,我们只需要重命名一个相同的列名怎么办?当然,不能在 select 子句中手动输入所有这些列名
    • 在这种情况下你可以选择df1.withColumnRenamed("a", "df1_a")
    【解决方案3】:

    有一种比为您要加入的所有列编写别名更简单的方法:

    df1.join(df2,['a'])
    

    如果您要加入的键在两个表中相同,则此方法有效。

    https://kb.databricks.com/data/join-two-dataframes-duplicated-columns.html

    【讨论】:

    • 这是 Spark 2+ 的实际答案
    • 对于 Scala:df1.join(df2, Seq("a"))
    • 页面已移至:kb.databricks.com/data/…
    • 很高兴我一直在滚动,这是更好的答案。如果列具有不同的名称,则没有歧义问题。如果列具有相同的名称,请执行此操作。几乎没有理由每次都需要使用这种方法处理模棱两可的列名。
    【解决方案4】:

    您可以使用def drop(col: Column)方法删除重复的列,例如:

    DataFrame:df1
    
    +-------+-----+
    | a     | f   |
    +-------+-----+
    |107831 | ... |
    |107831 | ... |
    +-------+-----+
    
    DataFrame:df2
    
    +-------+-----+
    | a     | f   |
    +-------+-----+
    |107831 | ... |
    |107831 | ... |
    +-------+-----+
    

    当我加入 df1 和 df2 时,DataFrame 将如下所示:

    val newDf = df1.join(df2,df1("a")===df2("a"))
    
    DataFrame:newDf
    
    +-------+-----+-------+-----+
    | a     | f   | a     | f   |
    +-------+-----+-------+-----+
    |107831 | ... |107831 | ... |
    |107831 | ... |107831 | ... |
    +-------+-----+-------+-----+
    

    现在,我们可以使用def drop(col: Column) 方法删除重复的列'a'或'f',如下所示:

    val newDfWithoutDuplicate = df1.join(df2,df1("a")===df2("a")).drop(df2("a")).drop(df2("f"))
    

    【讨论】:

    • 如果您正在执行外连接并且两列具有一些不同的值,这种方法是否有效?
    • 如果不同的关系与相同的架构,你可能不想放弃。
    【解决方案5】:

    这就是我们如何在 PySpark 中在相同的列名称上连接两个 Dataframe。

    df = df1.join(df2, ['col1','col2','col3'])
    

    如果您在此之后执行printSchema(),那么您可以看到重复的列已被删除。

    【讨论】:

      【解决方案6】:

      假设您要加入的 DataFrame 是 df1 和 df2,并且您在列 'a' 上加入它们,那么您有 2 个方法

      方法一

      df1.join(df2,'a','left_outer')

      这是一个很棒的方法,强烈推荐。

      方法二

      df1.join(df2,df1.a == df2.a,'left_outer').drop(df2.a)

      【讨论】:

        【解决方案7】:

        深入研究 Spark API 后,我发现我可以先使用 alias 为原始数据框创建别名,然后使用 withColumnRenamed 手动重命名别名上的每一列,这将完成 join不会导致列名重复。

        更多详情可以参考下面Spark Dataframe API

        pyspark.sql.DataFrame.alias

        pyspark.sql.DataFrame.withColumnRenamed

        但是,我认为这只是一个麻烦的解决方法,并且想知道是否有更好的方法来解决我的问题。

        【讨论】:

          【解决方案8】:

          这可能不是最好的方法,但是如果你想重命名重复的列(在连接之后),你可以使用这个小函数来实现。

          def rename_duplicate_columns(dataframe):
              columns = dataframe.columns
              duplicate_column_indices = list(set([columns.index(col) for col in columns if columns.count(col) == 2]))
              for index in duplicate_column_indices:
                  columns[index] = columns[index]+'2'
              dataframe = dataframe.toDF(*columns)
              return dataframe
          

          【讨论】:

            【解决方案9】:

            如果两个表中只有键列相同,则尝试使用以下方式(方法 1):

            left. join(right , 'key', 'inner')
            

            而不是下面(方法2):

            left. join(right , left.key == right.key, 'inner')
            

            使用方法 1 的优点:

            • “key”将在最终数据框中仅显示一次
            • 易于使用的语法

            使用方法 1 的缺点:

            • 仅对键列有帮助
            • 场景,其中左连接的情况,如果计划使用右键空计数,这将不起作用。在这种情况下,必须重命名上述密钥之一。

            【讨论】:

              【解决方案10】:

              如果您的用例比 Glennie Helles Sindholt 的回答中描述的更复杂,例如您有其他/很少的非连接列名称也相同,并且希望在选择最好使用别名时区分它们,例如:

              df3 = df1.select("a", "b").alias("left")\
                 .join(df2.select("a", "b").alias("right"), ["a"])\
                 .select("left.a", "left.b", "right.b")
              
              df3.columns
              ['a', 'b', 'b']
              

              【讨论】:

                【解决方案11】:

                什么对我有用

                import databricks.koalas as ks
                
                df1k = df1.to_koalas()
                df2k = df2.to_koalas()
                df3k = df1k.merge(df2k, on=['col1', 'col2'])
                df3 = df3k.to_spark()
                

                除了 col1 和 col2 之外的所有列,如果它们来自 df1,则在其名称后附加“_x”,如果它们来自 df2,则附加“_y”,这正是我所需要的。

                【讨论】:

                  猜你喜欢
                  • 2021-12-18
                  • 1970-01-01
                  • 1970-01-01
                  • 1970-01-01
                  • 2019-03-02
                  • 2017-04-26
                  • 1970-01-01
                  • 1970-01-01
                  • 2016-02-22
                  相关资源
                  最近更新 更多