【问题标题】:Spark repartitioning by column with dynamic number of partitions per columnSpark按列重新分区,每列具有动态分区数
【发布时间】:2019-10-08 12:36:31
【问题描述】:

如何根据列中项目数的计数对 DataFrame 进行分区。假设我们有一个包含 100 人的 DataFrame(列是 first_namecountry),我们想为一个国家/地区的每 10 人创建一个分区。

如果我们的数据集包含 80 个人来自中国、15 个人来自法国和 5 个人来自古巴,那么我们将需要 8 个中国分区、2 个法国分区和 1 个古巴分区。

以下代码不起作用:

  • df.repartition($"country"): 这样会为中国创建一个分区,为法国创建一个分区,为古巴创建一个分区
  • df.repartition(8, $"country", rand):这会为每个国家创建最多8个分区,所以应该为中国创建8个分区,但是法国和古巴的分区未知。法国可以分为 8 个分区,古巴最多可以分为 5 个分区。有关详细信息,请参阅this answer

这是repartition() 文档:

当我查看 repartition() 方法时,我什至没有看到采用三个参数的方法,所以看起来有些行为没有记录在案。

有没有办法动态设置每列的分区数?这将使创建分区数据集更加容易。

【问题讨论】:

  • 关于 3 个参数,$"country", rand 在第二次调用中作为partitionExprs 一起使用

标签: apache-spark


【解决方案1】:

由于 spark 分区数据的方式,您将无法完全实现这一点。 Spark 获取您在重新分区中指定的列,将该值散列为 64b 长,然后将该值与分区数取模。这样,分区的数量是确定的。它以这种方式工作的原因是连接需要在连接的左侧和右侧匹配数量的分区,此外还要确保两侧的哈希相同。

“我们希望为一个国家/地区的每 10 个人创建一个分区。”

您究竟想在这里完成什么?分区中只有 10 行可能对性能很不利。您是否尝试创建一个分区表,其中分区中的每个文件都保证只有 x 行?

"df.repartition($"country"): 这将为中国创建1个分区,为法国创建1个分区,为古巴创建1个分区"

这实际上将创建一个数据帧,其中包含按国家/地区散列的默认随机分区数

  def repartition(partitionExprs: Column*): Dataset[T] = {
    repartition(sparkSession.sessionState.conf.numShufflePartitions, partitionExprs: _*)
  }

"df.repartition(8, $"country", rand):这将为每个国家创建最多 8 个分区,因此它应该为中国创建 8 个分区,但法国和古巴的分区是未知的。法国可能是在 8 个分区中,古巴最多可以在 5 个分区中。有关详细信息,请参阅此答案。"

同样明智地这是错误的。只有 8 个分区,国家在这 8 个分区中基本上是随机打乱的。

【讨论】:

  • 感谢您指出我的细微错误。对于 10 行,不需要此代码,但这在在倾斜的大型数据集上创建分区数据湖时非常重要。
【解决方案2】:

以下代码将为每个数据文件创建十行 (sample dataset is here):

val outputPath = new java.io.File("./tmp/partitioned_lake5/").getCanonicalPath
df
  .repartition(col("person_country"))
  .write
  .option("maxRecordsPerFile", 10)
  .partitionBy("person_country")
  .csv(outputPath)

这里是 Spark 2.2 之前的代码,它将为每个数据文件创建大约 10 行:

val desiredRowsPerPartition = 10

val joinedDF = df
  .join(countDF, Seq("person_country"))
  .withColumn(
    "my_secret_partition_key",
    (rand(10) * col("count") / desiredRowsPerPartition).cast(IntegerType)
  )

val outputPath = new java.io.File("./tmp/partitioned_lake6/").getCanonicalPath
joinedDF
  .repartition(col("person_country"), col("my_secret_partition_key"))
  .drop("count", "my_secret_partition_key")
  .write
  .partitionBy("person_country")
  .csv(outputPath)

【讨论】:

  • 你从哪里得到这个 col("count") 列?
猜你喜欢
  • 2021-12-21
  • 2017-04-17
  • 2019-06-01
  • 2019-03-02
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多