【问题标题】:Spark: Merge 2 dataframes by adding row index/number on both dataframesSpark:通过在两个数据帧上添加行索引/编号来合并 2 个数据帧
【发布时间】:2016-11-09 13:44:23
【问题描述】:

问:在 PySpark 中有什么方法可以合并两个数据框或将一个数据框的一列复制到另一个?

例如,我有两个数据框:

DF1              
C1                    C2                                                        
23397414             20875.7353   
5213970              20497.5582   
41323308             20935.7956   
123276113            18884.0477   
76456078             18389.9269 

第二个数据帧

DF2
C3                       C4
2008-02-04               262.00                 
2008-02-05               257.25                 
2008-02-06               262.75                 
2008-02-07               237.00                 
2008-02-08               231.00 

然后我想像这样将 DF2 的 C3 添加到 DF1:

New DF              
    C1                    C2          C3                                              
    23397414             20875.7353   2008-02-04
    5213970              20497.5582   2008-02-05
    41323308             20935.7956   2008-02-06
    123276113            18884.0477   2008-02-07
    76456078             18389.9269   2008-02-08

我希望这个例子很清楚。

【问题讨论】:

    标签: apache-spark pyspark apache-spark-sql


    【解决方案1】:

    rownum + 窗口函数,即解决方案 1 或 zipWithIndex.map,即解决方案 2 在这种情况下应该有所帮助。

    解决方案1:你可以使用窗口函数来得到这个kind of

    那么我建议您将 rownumber 作为附加列名添加到 Dataframe 说 df1。

      DF1              
        C1                    C2                 columnindex                                             
        23397414             20875.7353            1
        5213970              20497.5582            2
        41323308             20935.7956            3
        123276113            18884.0477            4
        76456078             18389.9269            5
    

    第二个数据帧

    DF2
    C3                       C4             columnindex
    2008-02-04               262.00            1        
    2008-02-05               257.25            2      
    2008-02-06               262.75            3      
    2008-02-07               237.00            4          
    2008-02-08               231.00            5
    

    现在.. 进行 df1 和 df2 的内部连接,仅此而已... 你会得到下面的输出

    类似的东西

    from pyspark.sql.window import Window
    from pyspark.sql.functions import rowNumber
    
    w = Window().orderBy()
    
    df1 = ....  // as showed above df1
    
    df2 = ....  // as shown above df2
    
    
    df11 =  df1.withColumn("columnindex", rowNumber().over(w))
      df22 =  df2.withColumn("columnindex", rowNumber().over(w))
    
    newDF = df11.join(df22, df11.columnindex == df22.columnindex, 'inner').drop(df22.columnindex)
    newDF.show()
    
    
    
    New DF              
        C1                    C2          C3                                              
        23397414             20875.7353   2008-02-04
        5213970              20497.5582   2008-02-05
        41323308             20935.7956   2008-02-06
        123276113            18884.0477   2008-02-07
        76456078             18389.9269   2008-02-08
    

    解决方案 2:scala 中的另一种好方法(可能这是最好的:)),您可以将其转换为 pyspark :

    /**
    * Add Column Index to dataframe 
    */
    def addColumnIndex(df: DataFrame) = sqlContext.createDataFrame(
      // Add Column index
      df.rdd.zipWithIndex.map{case (row, columnindex) => Row.fromSeq(row.toSeq :+ columnindex)},
      // Create schema
      StructType(df.schema.fields :+ StructField("columnindex", LongType, false))
    )
    
    // Add index now...
    val df1WithIndex = addColumnIndex(df1)
    val df2WithIndex = addColumnIndex(df2)
    
     // Now time to join ...
    val newone = df1WithIndex
      .join(df2WithIndex , Seq("columnindex"))
      .drop("columnindex")
    

    【讨论】:

    • 使用下面的。 rowNumber 不再被使用 from pyspark.sql.window import Window from pyspark.sql.functions import row_number
    • row_number 也不适用于空白 orderBy()
    • 尝试在 python 中遵循解决方案 2。或低于已翻译的 pyspark 解决方案。 scala 中的第二种方法应该可以工作
    【解决方案2】:

    我想我会从@Ram Ghadiyaram 分享上面答案#2 的python (pyspark) 翻译:

    from pyspark.sql.functions import col
    def addColumnIndex(df): 
      # Create new column names
      oldColumns = df.schema.names
      newColumns = oldColumns + ["columnindex"]
    
      # Add Column index
      df_indexed = df.rdd.zipWithIndex().map(lambda (row, columnindex): \
                                             row + (columnindex,)).toDF()
    
      #Rename all the columns
      new_df = reduce(lambda data, idx: data.withColumnRenamed(oldColumns[idx], 
                      newColumns[idx]), xrange(len(oldColumns)), df_indexed)   
      return new_df
    
    # Add index now...
    df1WithIndex = addColumnIndex(df1)
    df2WithIndex = addColumnIndex(df2)
    
    #Now time to join ...
    newone = df1WithIndex.join(df2WithIndex, col("columnindex"),
                               'inner').drop("columnindex")
    

    【讨论】:

      【解决方案3】:

      对于python3版本,

      from pyspark.sql.types import StructType, StructField, LongType
      
      def with_column_index(sdf): 
          new_schema = StructType(sdf.schema.fields + [StructField("ColumnIndex", LongType(), False),])
          return sdf.rdd.zipWithIndex().map(lambda row: row[0] + (row[1],)).toDF(schema=new_schema)
      
      df1_ci = with_column_index(df1)
      df2_ci = with_column_index(df2)
      join_on_index = df1_ci.join(df2_ci, df1_ci.ColumnIndex == df2_ci.ColumnIndex, 'inner').drop("ColumnIndex")
      

      【讨论】:

        【解决方案4】:

        我参考了他(@Jed)的回答

        from pyspark.sql.functions import col
        def addColumnIndex(df): 
            # Get old columns names and add a column "columnindex"
            oldColumns = df.columns
            newColumns = oldColumns + ["columnindex"]
        
            # Add Column index
            df_indexed = df.rdd.zipWithIndex().map(lambda (row, columnindex): \
                                                 row + (columnindex,)).toDF()
            #Rename all the columns
            oldColumns = df_indexed.columns  
            new_df = reduce(lambda data, idx:data.withColumnRenamed(oldColumns[idx], 
                          newColumns[idx]), xrange(len(oldColumns)), df_indexed)   
            return new_df
        
        # Add index now...
        df1WithIndex = addColumnIndex(df1)
        df2WithIndex = addColumnIndex(df2)
        
        #Now time to join ...
        newone = df1WithIndex.join(df2WithIndex, col("columnindex"),
                                   'inner').drop("columnindex")
        

        【讨论】:

        • 与杰德的回答有什么不同?如果有差异,应该解释,如果没有,这不应该作为答案发布..
        【解决方案5】:

        This answer 为我解决了这个问题:

        import pyspark.sql.functions as sparkf
        
        # This will return a new DF with all the columns + id
        res = df.withColumn('id', sparkf.monotonically_increasing_id())
        

        感谢Arkadi T

        【讨论】:

        • 这不适用于连接两个数据框。 monotonically_increasing_id 函数不返回连续数字。不保证两个数据帧会为每个 df 中的行分配相同的整数
        • 我不同意两个数据框中的索引可能不同。请看下面我的代码
        • @justincress 是这样,为保证相同的 id 应在使用 monotonically_increasing_id 之前添加 .coalesce(1)
        【解决方案6】:

        这里有一个简单的例子,即使你已经解决了问题,它也可以帮助你。

          //create First Dataframe
          val df1 = spark.sparkContext.parallelize(Seq(1,2,1)).toDF("lavel1")
        
          //create second Dataframe
          val df2 = spark.sparkContext.parallelize(Seq((1.0, 12.1), (12.1, 1.3), (1.1, 0.3))). toDF("f1", "f2")
        
          //Combine both dataframe
          val combinedRow = df1.rdd.zip(df2.rdd). map({
            //convert both dataframe to Seq and join them and return as a row
            case (df1Data, df2Data) => Row.fromSeq(df1Data.toSeq ++ df2Data.toSeq)
          })
        //  create new Schema from both the dataframe's schema
          val combinedschema =  StructType(df1.schema.fields ++ df2.schema.fields)
        
        //  Create a new dataframe from new row and new schema
          val finalDF = spark.sqlContext.createDataFrame(combinedRow, combinedschema)
        
          finalDF.show
        

        【讨论】:

        • 注:Caused by: org.apache.spark.SparkException: Can only zip RDDs with same number of elements in each partition
        【解决方案7】:

        扩展Jed's answer,回应Ajinkya的评论:

        要获得相同的旧列名,您需要将“old_cols”替换为新命名的索引列的列列表。下面看我修改后的函数版本

        def add_column_index(df):
            new_cols = df.schema.names + ['ix']
            ix_df = df.rdd.zipWithIndex().map(lambda (row, ix): row + (ix,)).toDF()
            tmp_cols = ix_df.schema.names
            return reduce(lambda data, idx: data.withColumnRenamed(tmp_cols[idx], new_cols[idx]), xrange(len(tmp_cols)), ix_df)
        

        【讨论】:

          【解决方案8】:

          在性能方面不是更好的方式。

          df3=df1.crossJoin(df2).show(3)
          

          【讨论】:

          • 这将创建所有行组合,而不是 OP 想要的
          【解决方案9】:

          要合并来自两个不同数据框的列,您首先需要创建一个列索引,然后连接两个数据框。实际上,两个数据框类似于两个 SQL 表。要建立联系,您必须加入他们。

          如果您不关心行的最终顺序,您可以使用 monotonically_increasing_id() 生成索引列。

          使用以下代码,您可以检查 monotonically_increasing_id 在两个数据帧中是否生成了相同的索引列(至少多达 10 亿行),因此您不会在合并的数据帧中出现任何错误。

          from pyspark.sql import SparkSession
          import pyspark.sql.functions as F
          
          sample_size = 1E9
          
          sdf1 = spark.range(1, sample_size).select(F.col("id").alias("id1"))
          sdf2 = spark.range(1, sample_size).select(F.col("id").alias("id2"))
          
          sdf1 = sdf1.withColumn("idx", sf.monotonically_increasing_id())
          sdf2 = sdf2.withColumn("idx", sf.monotonically_increasing_id())
          
          sdf3 = sdf1.join(sdf2, 'idx', 'inner')
          sdf3 = sdf3.withColumn("diff", F.col("id1")-F.col("id2")).select("diff")
          sdf3.filter(F.col("diff") != 0 ).show()
          

          【讨论】:

            【解决方案10】:

            您可以组合使用monotonically_increasing_id(保证始终递增)和row_number(保证始终给出相同的序列)。您不能单独使用row_number,因为它需要由某些东西订购。所以在这里我们通过monotonically_increasing_id 订购。我正在使用 Spark 2.3.1 和 Python 2.7.13。

            from pandas import DataFrame
            from pyspark.sql.functions import (
                monotonically_increasing_id,
                row_number)
            from pyspark.sql import Window
            
            DF1 = spark.createDataFrame(DataFrame({
                'C1': [23397414, 5213970, 41323308, 123276113, 76456078],
                'C2': [20875.7353, 20497.5582, 20935.7956, 18884.0477, 18389.9269]}))
            
            DF2 = spark.createDataFrame(DataFrame({
            'C3':['2008-02-04', '2008-02-05', '2008-02-06', '2008-02-07', '2008-02-08']}))
            
            DF1_idx = (
                DF1
                .withColumn('id', monotonically_increasing_id())
                .withColumn('columnindex', row_number().over(Window.orderBy('id')))
                .select('columnindex', 'C1', 'C2'))
            
            DF2_idx = (
                DF2
                .withColumn('id', monotonically_increasing_id())
                .withColumn('columnindex', row_number().over(Window.orderBy('id')))
                .select('columnindex', 'C3'))
            
            DF_complete = (
                DF1_idx
                .join(
                    other=DF2_idx,
                    on=['columnindex'],
                    how='inner')
                .select('C1', 'C2', 'C3'))
            
            DF_complete.show()
            
            +---------+----------+----------+
            |       C1|        C2|        C3|
            +---------+----------+----------+
            | 23397414|20875.7353|2008-02-04|
            |  5213970|20497.5582|2008-02-05|
            | 41323308|20935.7956|2008-02-06|
            |123276113|18884.0477|2008-02-07|
            | 76456078|18389.9269|2008-02-08|
            +---------+----------+----------+
            

            【讨论】:

            • 其中一位反对者愿意在这里提出一些建设性的批评吗?我不明白为什么这是一个不好的答案。
            • 不是我,但不幸的是,这种方法并不能保证两个DataFrame中的行顺序相同。我用一个 10 行的 DataFrame 进行了尝试,结果被打乱了。很遗憾,因为我不能使用 rdd 方法(我正在使用的集群上的 rdd API 未列入白名单)。
            猜你喜欢
            • 1970-01-01
            • 2019-08-11
            • 2021-03-10
            • 1970-01-01
            • 2020-02-29
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 2021-10-27
            相关资源
            最近更新 更多