【发布时间】:2017-09-10 10:32:12
【问题描述】:
我从 csv 文件中读取数据,但没有索引。
我想将一列从 1 添加到行号。
我该怎么办,谢谢 (scala)
【问题讨论】:
标签: scala apache-spark dataframe apache-spark-sql
我从 csv 文件中读取数据,但没有索引。
我想将一列从 1 添加到行号。
我该怎么办,谢谢 (scala)
【问题讨论】:
标签: scala apache-spark dataframe apache-spark-sql
【讨论】:
df.withColumn("id",monotonicallyIncreasingId)
monotonicallyIncreasingId 已被弃用。与 PySpark 示例一样,对应的函数是 monotonically_increasing_id()。
monotonically_increasing_id - 生成的 ID 保证单调递增且唯一,但不连续。
“我想将一列从 1 添加到行号。”
假设我们有以下 DF
+--------+-------------+--------+ |用户名 |产品代码 |计数 | +--------+-------------+--------+ | 25 | 6001 | 2 | | 11 | 5001 | 8 | | 23 | 123 | 5 | +--------+-------------+--------+从 1 开始生成 ID
val w = Window.orderBy("count")
val result = df.withColumn("index", row_number().over(w))
这将添加一个按计数值递增排序的索引列。
+--------+-------------+--------+-------+ |用户名 |产品代码 |计数 |索引 | +--------+-------------+--------+-------+ | 25 | 6001 | 2 | 1 | | 23 | 123 | 5 | 2 | | 11 | 5001 | 8 | 3 | +--------+-------------+--------+-------+【讨论】:
row_number()可以从0开始吗?
如何获得一个连续的id列 id[1, 2, 3, 4...n]:
from pyspark.sql.functions import desc, row_number, monotonically_increasing_id
from pyspark.sql.window import Window
df_with_seq_id = df.withColumn('index_column_name', row_number().over(Window.orderBy(monotonically_increasing_id())) - 1)
注意 row_number() 从 1 开始,因此如果你想要 0 索引列,则减 1
【讨论】:
注意:以上方法没有给出序列号,但它确实给出了递增的 id。
执行此操作并确保索引顺序如下所示的简单方法。zipWithIndex。
样本数据。
+-------------------+
| Name|
+-------------------+
| Ram Ghadiyaram|
| Ravichandra|
| ilker|
| nick|
| Naveed|
| Gobinathan SP|
|Sreenivas Venigalla|
| Jackela Kowski|
| Arindam Sengupta|
| Liangpi|
| Omar14|
| anshu kumar|
+-------------------+
package com.example
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{LongType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row}
/**
* DistributedDataIndex : Program to index an RDD with
*/
object DistributedDataIndex extends App with Logging {
val spark = builder
.master("local[*]")
.appName(this.getClass.getName)
.getOrCreate()
import spark.implicits._
val df = spark.sparkContext.parallelize(
Seq("Ram Ghadiyaram", "Ravichandra", "ilker", "nick"
, "Naveed", "Gobinathan SP", "Sreenivas Venigalla", "Jackela Kowski", "Arindam Sengupta", "Liangpi", "Omar14", "anshu kumar"
)).toDF("Name")
df.show
logInfo("addColumnIndex here")
// Add index now...
val df1WithIndex = addColumnIndex(df)
.withColumn("monotonically_increasing_id", monotonically_increasing_id)
df1WithIndex.show(false)
/**
* Add Column Index to dataframe to each row
*/
def addColumnIndex(df: DataFrame) = {
spark.sqlContext.createDataFrame(
df.rdd.zipWithIndex.map {
case (row, index) => Row.fromSeq(row.toSeq :+ index)
},
// Create schema for index column
StructType(df.schema.fields :+ StructField("index", LongType, false)))
}
}
结果:
+-------------------+-----+---------------------------+
|Name |index|monotonically_increasing_id|
+-------------------+-----+---------------------------+
|Ram Ghadiyaram |0 |0 |
|Ravichandra |1 |8589934592 |
|ilker |2 |8589934593 |
|nick |3 |17179869184 |
|Naveed |4 |25769803776 |
|Gobinathan SP |5 |25769803777 |
|Sreenivas Venigalla|6 |34359738368 |
|Jackela Kowski |7 |42949672960 |
|Arindam Sengupta |8 |42949672961 |
|Liangpi |9 |51539607552 |
|Omar14 |10 |60129542144 |
|anshu kumar |11 |60129542145 |
+-------------------+-----+---------------------------+
【讨论】:
monotonically_increasing_id作为列的两种方法之间的区别
正如 Ram 所说,zippedwithindex 比单调递增的 id 更好,id 你需要连续的行号。试试这个(PySpark 环境):
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, LongType
new_schema = StructType(**original_dataframe**.schema.fields[:] + [StructField("index", LongType(), False)])
zipped_rdd = **original_dataframe**.rdd.zipWithIndex()
indexed = (zipped_rdd.map(lambda ri: row_with_index(*list(ri[0]) + [ri[1]])).toDF(new_schema))
其中 original_dataframe 是您必须添加索引的数据框,row_with_index 是具有列索引的新架构,您可以将其写为
row_with_index = Row(
"calendar_date"
,"year_week_number"
,"year_period_number"
,"realization"
,"index"
)
在这里,calendar_date、year_week_number、year_period_number 和实现是我原始数据框的列。您可以将名称替换为列的名称。 index 是您必须为行号添加的新列名。
【讨论】:
indexed = (zipped_rdd.map(lambda ri: Row(*list(ri[0]) + [ri[1]])).toDF(new_schema))
如果您需要每行唯一的序列号,我有一个稍微不同的方法,其中添加了一个静态列并用于使用该列计算行号。
val srcData = spark.read.option("header","true").csv("/FileStore/sample.csv")
srcData.show(5)
+--------+--------------------+
| Job| Name|
+--------+--------------------+
|Morpheus| HR Specialist|
| Kayla| Lawyer|
| Trisha| Bus Driver|
| Robert|Elementary School...|
| Ober| Judge|
+--------+--------------------+
val srcDataModf = srcData.withColumn("sl_no",lit("1"))
val windowSpecRowNum = Window.partitionBy("sl_no").orderBy("sl_no")
srcDataModf.withColumn("row_num",row_number.over(windowSpecRowNum)).drop("sl_no").select("row_num","Name","Job")show(5)
+-------+--------------------+--------+
|row_num| Name| Job|
+-------+--------------------+--------+
| 1| HR Specialist|Morpheus|
| 2| Lawyer| Kayla|
| 3| Bus Driver| Trisha|
| 4|Elementary School...| Robert|
| 5| Judge| Ober|
+-------+--------------------+--------+
【讨论】:
对于 SparkR:
(假设 sdf 是某种 spark 数据帧)
sdf<- withColumn(sdf, "row_id", SparkR:::monotonically_increasing_id())
【讨论】: