【问题标题】:How to add new Column in pyspark and insert multiple values with based on rows?如何在 pyspark 中添加新列并根据行插入多个值?
【发布时间】:2020-11-04 05:57:31
【问题描述】:

我是 pyspark 的新手。 我想添加一个具有多个值的新列以及具有这些值的分区。

import math

coun=df.count()

if(coun<= 20000):
    chunksize=2
    rowsperchunk = math.ceil(coun/2)
else:
    chunksize= math.ceil(coun/20000)
    rowsperchunk = 20000

for i in chunksize:
    df.limit(num_rows_per_chunk).withColumn('chunk',F.lit(i))

在上面的for循环中,它只会插入1个值直到限制

示例: 我的数据框中有 100k 行,因此块大小为 5。 每个块的行数为 20 000 所以我需要添加新列,首先需要插入值为 1 的 20 000 行,然后需要插入值为 2 的接下来的 20 000 行。直到块大小结束。然后我想根据我们创建的新列进行分区

【问题讨论】:

  • 您找到想要的答案了吗?

标签: python apache-spark pyspark apache-spark-sql


【解决方案1】:

因此,您希望对数据进行重新分区,以便在相同大小的分区中进行分区,同时保持顺序。

在火花中并不那么容易。我要做的是从计算每个分区的大小开始。然后,对于每个分区,我将计算先前分区中数据帧中的记录数。有了这个和分区中记录的等级(partition_rank),除以所需分区的大小将为我提供新的分配。请注意,我引入了一个 index 列来计算排名并保留顺序。代码如下:

partition_size = 20000

from pyspark.sql import functions as F
part_counts = df.withColumn("p", F.spark_partition_id()).groupBy("p").count().collect()
part_counts.sort()
part_counts = [(x[0], x[1]) for x in part_counts]

cum_part_counts = []
sum=0
for index, count in part_counts:
    cum_part_counts.append((index, sum))
    sum+=count
cum_part_counts_df = spark.createDataFrame(cum_part_counts, ['partition_index', 'count'])

repartitioned_df = df\
  .withColumn("partition_index", F.spark_partition_id())\
  .withColumn("index", F.monotonically_increasing_id())\
  .withColumn("partition_rank", F.rank().over(
           Window.partitionBy("partition_index").orderBy("index")))\
  .join(cum_part_counts_df, ['partition_index'])\
  .withColumn("new_partition",
      F.floor((F.col("count") + F.col("partition_rank") - 1)/partition_size))\
  .orderBy("index")\
  .write.partitionBy("new_partition").parquet("...")

【讨论】:

    猜你喜欢
    • 2020-08-09
    • 1970-01-01
    • 1970-01-01
    • 2021-10-19
    • 2020-07-29
    • 2012-12-21
    • 1970-01-01
    • 2021-11-15
    • 2021-07-03
    相关资源
    最近更新 更多