【问题标题】:Spark Dataframe :How to add a index Column : Aka Distributed Data IndexSpark Dataframe:如何添加索引列:Aka 分布式数据索引
【发布时间】:2017-09-10 10:32:12
【问题描述】:

我从 csv 文件中读取数据,但没有索引。

我想将一列从 1 添加到行号。

我该怎么办,谢谢 (scala)

【问题讨论】:

    标签: scala apache-spark dataframe apache-spark-sql


    【解决方案1】:

    在 Scala 中,您可以使用:

    import org.apache.spark.sql.functions._ 
    
    df.withColumn("id",monotonicallyIncreasingId)
    

    你可以参考这个exemple和scaladocs

    使用 Pyspark,您可以使用:

    from pyspark.sql.functions import monotonically_increasing_id 
    
    df_index = df.select("*").withColumn("id", monotonically_increasing_id())
    

    【讨论】:

    • 我想知道为什么您为 scala 编写的代码不适用于 pyspark。即df.withColumn("id",monotonicallyIncreasingId)
    • scala 代码有效。谢谢 但是我收到以下警告“警告:有一个弃用警告;使用 -deprecation 重新运行以获取详细信息”
    • monotonicallyIncreasingId 不保证“id”将是“从 1 到行号”。来自文档:spark.apache.org/docs/latest/api/java/org/apache/spark/sql/…“生成的 ID 保证单调递增且唯一,但不连续”
    • 我也注意到了。您可以使用以下 df.withColumn("id",monotonicallyIncreasingId+1) 使 id 以 1 开头。它在一定程度上可以正常工作,超过此 id 几乎是 15 位数。
    • 从 Spark 2.0.0 开始,函数 monotonicallyIncreasingId 已被弃用。与 PySpark 示例一样,对应的函数是 monotonically_increasing_id()
    【解决方案2】:

    monotonically_increasing_id - 生成的 ID 保证单调递增且唯一,但不连续。

    “我想将一列从 1 添加到行号。”

    假设我们有以下 DF

    +--------+-------------+--------+ |用户名 |产品代码 |计数 | +--------+-------------+--------+ | 25 | 6001 | 2 | | 11 | 5001 | 8 | | 23 | 123 | 5 | +--------+-------------+--------+

    从 1 开始生成 ID

    val w = Window.orderBy("count")
    val result = df.withColumn("index", row_number().over(w))
    

    这将添加一个按计数值递增排序的索引列。

    +--------+-------------+--------+-------+ |用户名 |产品代码 |计数 |索引 | +--------+-------------+--------+-------+ | 25 | 6001 | 2 | 1 | | 23 | 123 | 5 | 2 | | 11 | 5001 | 8 | 3 | +--------+-------------+--------+-------+

    【讨论】:

    • 以上适用于小型数据集;如果您的数据> 100M,则完成工作可能会有问题。经验之谈。
    • row_number()可以从0开始吗?
    【解决方案3】:

    如何获得一个连续的id列 id[1, 2, 3, 4...n]:

    from pyspark.sql.functions import desc, row_number, monotonically_increasing_id
    from pyspark.sql.window import Window
    
    df_with_seq_id = df.withColumn('index_column_name', row_number().over(Window.orderBy(monotonically_increasing_id())) - 1)
    

    注意 row_number() 从 1 开始,因此如果你想要 0 索引列,则减 1

    【讨论】:

      【解决方案4】:

      注意:以上方法没有给出序列号,但它确实给出了递增的 id。

      执行此操作并确保索引顺序如下所示的简单方法。zipWithIndex

      样本数据。

      +-------------------+
      |               Name|
      +-------------------+
      |     Ram Ghadiyaram|
      |        Ravichandra|
      |              ilker|
      |               nick|
      |             Naveed|
      |      Gobinathan SP|
      |Sreenivas Venigalla|
      |     Jackela Kowski|
      |   Arindam Sengupta|
      |            Liangpi|
      |             Omar14|
      |        anshu kumar|
      +-------------------+
      

          package com.example
      
      import org.apache.spark.internal.Logging
      import org.apache.spark.sql.SparkSession._
      import org.apache.spark.sql.functions._
      import org.apache.spark.sql.types.{LongType, StructField, StructType}
      import org.apache.spark.sql.{DataFrame, Row}
      
      /**
        * DistributedDataIndex : Program to index an RDD  with
        */
      object DistributedDataIndex extends App with Logging {
      
        val spark = builder
          .master("local[*]")
          .appName(this.getClass.getName)
          .getOrCreate()
      
        import spark.implicits._
      
        val df = spark.sparkContext.parallelize(
          Seq("Ram Ghadiyaram", "Ravichandra", "ilker", "nick"
            , "Naveed", "Gobinathan SP", "Sreenivas Venigalla", "Jackela Kowski", "Arindam Sengupta", "Liangpi", "Omar14", "anshu kumar"
          )).toDF("Name")
        df.show
        logInfo("addColumnIndex here")
        // Add index now...
        val df1WithIndex = addColumnIndex(df)
          .withColumn("monotonically_increasing_id", monotonically_increasing_id)
        df1WithIndex.show(false)
      
        /**
          * Add Column Index to dataframe to each row
          */
        def addColumnIndex(df: DataFrame) = {
          spark.sqlContext.createDataFrame(
            df.rdd.zipWithIndex.map {
              case (row, index) => Row.fromSeq(row.toSeq :+ index)
            },
            // Create schema for index column
            StructType(df.schema.fields :+ StructField("index", LongType, false)))
        }
      }
      

      结果:

      +-------------------+-----+---------------------------+
      |Name               |index|monotonically_increasing_id|
      +-------------------+-----+---------------------------+
      |Ram Ghadiyaram     |0    |0                          |
      |Ravichandra        |1    |8589934592                 |
      |ilker              |2    |8589934593                 |
      |nick               |3    |17179869184                |
      |Naveed             |4    |25769803776                |
      |Gobinathan SP      |5    |25769803777                |
      |Sreenivas Venigalla|6    |34359738368                |
      |Jackela Kowski     |7    |42949672960                |
      |Arindam Sengupta   |8    |42949672961                |
      |Liangpi            |9    |51539607552                |
      |Omar14             |10   |60129542144                |
      |anshu kumar        |11   |60129542145                |
      +-------------------+-----+---------------------------+
      

      【讨论】:

      • 显示添加monotonically_increasing_id作为列的两种方法之间的区别
      • internal.logging包有什么用
      • @stack0114106 spark 使用这个我在我的应用程序中使用的相同
      【解决方案5】:

      正如 Ram 所说,zippedwithindex 比单调递增的 id 更好,id 你需要连续的行号。试试这个(PySpark 环境):

      from pyspark.sql import Row
      from pyspark.sql.types import StructType, StructField, LongType
      
      new_schema = StructType(**original_dataframe**.schema.fields[:] + [StructField("index", LongType(), False)])
      zipped_rdd = **original_dataframe**.rdd.zipWithIndex()
      indexed = (zipped_rdd.map(lambda ri: row_with_index(*list(ri[0]) + [ri[1]])).toDF(new_schema))
      

      其中 original_dataframe 是您必须添加索引的数据框,row_with_index 是具有列索引的新架构,您可以将其写为

      row_with_index = Row(
      "calendar_date"
      ,"year_week_number"
      ,"year_period_number"
      ,"realization"
      ,"index"
      )
      

      在这里,calendar_dateyear_week_numberyear_period_number 和实现是我原始数据框的列。您可以将名称替换为列的名称。 index 是您必须为行号添加的新列名。

      【讨论】:

      • 无需编写新架构(除非您想要代码可解释性)=> indexed = (zipped_rdd.map(lambda ri: Row(*list(ri[0]) + [ri[1]])).toDF(new_schema))
      【解决方案6】:

      如果您需要每行唯一的序列号,我有一个稍微不同的方法,其中添加了一个静态列并用于使用该列计算行号。

      val srcData = spark.read.option("header","true").csv("/FileStore/sample.csv")
      srcData.show(5)
      
      +--------+--------------------+
      |     Job|                Name|
      +--------+--------------------+
      |Morpheus|       HR Specialist|
      |   Kayla|              Lawyer|
      |  Trisha|          Bus Driver|
      |  Robert|Elementary School...|
      |    Ober|               Judge|
      +--------+--------------------+
      
      val srcDataModf = srcData.withColumn("sl_no",lit("1"))
      val windowSpecRowNum =  Window.partitionBy("sl_no").orderBy("sl_no")
      
      srcDataModf.withColumn("row_num",row_number.over(windowSpecRowNum)).drop("sl_no").select("row_num","Name","Job")show(5)
      
      +-------+--------------------+--------+
      |row_num|                Name|     Job|
      +-------+--------------------+--------+
      |      1|       HR Specialist|Morpheus|
      |      2|              Lawyer|   Kayla|
      |      3|          Bus Driver|  Trisha|
      |      4|Elementary School...|  Robert|
      |      5|               Judge|    Ober|
      +-------+--------------------+--------+
      

      【讨论】:

        【解决方案7】:

        对于 SparkR:

        (假设 sdf 是某种 spark 数据帧)

        sdf<- withColumn(sdf, "row_id", SparkR:::monotonically_increasing_id())

        【讨论】:

          猜你喜欢
          • 2013-09-30
          • 1970-01-01
          • 2016-11-08
          • 2016-05-03
          • 2018-11-04
          • 2018-09-08
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          相关资源
          最近更新 更多