【问题标题】:adding a unique consecutive row number to dataframe in pyspark向pyspark中的数据框添加唯一的连续行号
【发布时间】:2019-04-04 14:12:32
【问题描述】:

我想在 pyspark 中将唯一的行号添加到我的数据框中,并且不想使用 monotonicallyIncreasingId 和 partitionBy 方法。 我认为这个问题可能是之前提出的类似问题的重复,无论我是否以正确的方式进行操作,仍在寻找一些建议。 以下是我的代码的 sn-p: 我有一个带有以下输入记录集的 csv 文件:

1,VIKRANT SINGH RANA    ,NOIDA   ,10000
3,GOVIND NIMBHAL        ,DWARKA  ,92000
2,RAGHVENDRA KUMAR GUPTA,GURGAON ,50000
4,ABHIJAN SINHA         ,SAKET   ,65000
5,SUPER DEVELOPER       ,USA     ,50000
6,RAJAT TYAGI           ,UP      ,65000
7,AJAY SHARMA           ,NOIDA   ,70000
8,SIDDHARTH BASU        ,SAKET   ,72000
9,ROBERT                ,GURGAON ,70000

我已将此 csv 文件加载到数据框中。

PATH_TO_FILE="file:///u/user/vikrant/testdata/EMP_FILE.csv"

emp_df = spark.read.format("com.databricks.spark.csv") \
  .option("mode", "DROPMALFORMED") \
  .option("header", "true") \
  .option("inferschema", "true") \
  .option("delimiter", ",").load(PATH_TO_FILE)

+------+--------------------+--------+----------+
|emp_id|            emp_name|emp_city|emp_salary|
+------+--------------------+--------+----------+
|     1|VIKRANT SINGH RAN...|NOIDA   |     10000|
|     3|GOVIND NIMBHAL   ...|DWARKA  |     92000|
|     2|RAGHVENDRA KUMAR ...|GURGAON |     50000|
|     4|ABHIJAN SINHA    ...|SAKET   |     65000|
|     5|SUPER DEVELOPER  ...|USA     |     50000|
|     6|RAJAT TYAGI      ...|UP      |     65000|
|     7|AJAY SHARMA      ...|NOIDA   |     70000|
|     8|SIDDHARTH BASU   ...|SAKET   |     72000|
|     9|ROBERT           ...|GURGAON |     70000|
+------+--------------------+--------+----------+

empRDD = emp_df.rdd.zipWithIndex()
newRDD=empRDD.map(lambda x: (list(x[0]) + [x[1]]))
 newRDD.take(2);
[[1, u'VIKRANT SINGH RANA    ', u'NOIDA   ', 10000, 0], [3, u'GOVIND NIMBHAL        ', u'DWARKA  ', 92000, 1]]

当我将 int 值包含到我的列表中时,我丢失了数据框架构。

newdf=newRDD.toDF(['emp_id','emp_name','emp_city','emp_salary','row_id'])
newdf.show();

+------+--------------------+--------+----------+------+
|emp_id|            emp_name|emp_city|emp_salary|row_id|
+------+--------------------+--------+----------+------+
|     1|VIKRANT SINGH RAN...|NOIDA   |     10000|     0|
|     3|GOVIND NIMBHAL   ...|DWARKA  |     92000|     1|
|     2|RAGHVENDRA KUMAR ...|GURGAON |     50000|     2|
|     4|ABHIJAN SINHA    ...|SAKET   |     65000|     3|
|     5|SUPER DEVELOPER  ...|USA     |     50000|     4|
|     6|RAJAT TYAGI      ...|UP      |     65000|     5|
|     7|AJAY SHARMA      ...|NOIDA   |     70000|     6|
|     8|SIDDHARTH BASU   ...|SAKET   |     72000|     7|
|     9|ROBERT           ...|GURGAON |     70000|     8|
+------+--------------------+--------+----------+------+

我做得对吗?或者有没有更好的方法在 pyspark 中添加或保留数据框的模式?

是否也可以使用 zipWithIndex 方法为大尺寸数据框添加唯一的连续行号?我们可以使用这个row_id对dataframe进行重新分区,从而将数据均匀分布在partitions之间吗?

【问题讨论】:

  • 丢失模式到底是什么意思?它是否以非整数列开始但随后移至字符串?还有为什么你不想使用单调递增的 id?
  • 单调增加不是增加我想要的连续增量。它只是向我的数据框添加随机唯一数字。并且带有窗口函数的 partitionby 将 n 个分区数据带入一个分区
  • 丢失的模式我的意思是在将 rdd 转换为数据框时。我还必须指定列名。将rdd转换为rdd然后从rdd转换为dataframe时,有什么方法可以保留rdd的架构。
  • 在分区集上执行row_number,然后按您的选择进行排序。为什么要使用 RDD?将所有DataFrames/DataSets分解为RDD后,并不意味着你需要使用它。不惜一切代价避免 RDD。而且,您是否丢失了订单或架构?我确定这是订单而不是架构。
  • 我想我可以将 partitionby 子句与窗口函数一起使用,而不是仅使用 order by.. 这样数据不会移动到单个分区.. 我会尝试一下。

标签: csv dataframe pyspark rdd


【解决方案1】:

使用 Spark SQL:

df = spark.sql("""
SELECT 
    row_number() OVER (
        PARTITION BY '' 
        ORDER BY '' 
    ) as id,
    *
FROM 
    VALUES 
    ('Bob  ', 20),
    ('Alice', 21),
    ('Gary ', 21),
    ('Kent ', 25),
    ('Gary ', 35)
""")

输出:

>>> df.printSchema()
root
 |-- id: integer (nullable = true)
 |-- col1: string (nullable = false)
 |-- col2: integer (nullable = false)

>>> df.show()
+---+-----+----+
| id| col1|col2|
+---+-----+----+
|  1|Bob  |  20|
|  2|Alice|  21|
|  3|Gary |  21|
|  4|Kent |  25|
|  5|Gary |  35|
+---+-----+----+

【讨论】:

    【解决方案2】:

    我找到了一个解决方案,而且非常简单。 因为我的数据框中没有在所有行中具有相同值的列,所以在将它与 partitionBy 子句一起使用时,使用 row_number 不会生成唯一的行号。

    让我们在现有数据框中添加一个新列,其中包含一些默认值。

    emp_df= emp_df.withColumn("new_column",lit("ABC"))
    

    并使用该列“new_column”创建一个带有分区的窗口函数

    w = Window().partitionBy('new_column').orderBy(lit('A'))
    df = emp_df.withColumn("row_num", row_number().over(w)).drop("new_column")
    

    你会得到想要的结果:

    +------+--------------------+--------+----------+-------+
    |emp_id|            emp_name|emp_city|emp_salary|row_num|
    +------+--------------------+--------+----------+-------+
    |     1|VIKRANT SINGH RAN...|NOIDA   |     10000|      1|
    |     2|RAGHVENDRA KUMAR ...|GURGAON |     50000|      2|
    |     7|AJAY SHARMA      ...|NOIDA   |     70000|      3|
    |     9|ROBERT           ...|GURGAON |     70000|      4|
    |     4|ABHIJAN SINHA    ...|SAKET   |     65000|      5|
    |     8|SIDDHARTH BASU   ...|SAKET   |     72000|      6|
    |     5|SUPER DEVELOPER  ...|USA     |     50000|      7|
    |     3|GOVIND NIMBHAL   ...|DWARKA  |     92000|      8|
    |     6|RAJAT TYAGI      ...|UP      |     65000|      9|
    +------+--------------------+--------+----------+-------+
    

    【讨论】:

    • 更简单的方法:withColumn("index",F.row_number().over(Window.orderBy(monotonically_increasing_id()))-1)
    • @zhaoyufei 为什么最后要加-1
    • @PikoMonde 我的用法是生成一个范围从 0 到某个数字的索引,所以我添加了 -1。如果你的用例是从 1 开始的,你可以删除 -1 部分,没关系。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-03-17
    • 1970-01-01
    • 1970-01-01
    • 2023-03-22
    • 1970-01-01
    相关资源
    最近更新 更多