【问题标题】:Spark dataframe is not ordered after sortSpark数据框在排序后未排序
【发布时间】:2019-01-08 21:07:33
【问题描述】:

我正在处理一个 JSON 文件以使用 Spark(版本 1.6.1)生成两个 JSON 文件。输入文件大小约为 30~40G(100M 记录)。对于生成的文件,较大的大约10G~15G(30M记录),较小的大约500M~750M(1.5M记录)。两个结果文件都面临以下问题:

我为数据框调用了“排序”方法,然后执行“重新分区”以将结果合并到一个文件中。然后我检查了生成的文件,发现在一个区间内记录是有序的,但整个文件不是全局排序的。例如文件中最后一条记录(第 1.9M 行)的键(由 3 列构成)为“(ou7QDj48c, 014, 075)”,但文件中一条中间记录的键(第 375K 行)为“( pzwzh5vm8, 003, 023)"

pzwzh5vm8 003 023
...
ou7QDj48c 014 075

当我使用相对较小的输入源(输入文件 400K 行)在本地测试代码时,根本不会发生这种情况。

我的具体代码如下所示:

big_json = big_json.sort($"col1", $"col2", $"col3", $"col4")
big_json.repartition(1).write.mode("overwrite").json("filepath")

谁能给个建议?谢谢。

(我也注意到this thread 讨论了类似的问题,但到目前为止还没有一个好的解决方案。如果这种现象真的是由重新分区操作引起的,谁能帮我有效地将数据帧转换为单个 JSON文件而不将其转换为 RDD,同时保持排序顺序?谢谢)

解决方案

非常感谢@manos @eliasah 和@pkrishna 的帮助。在阅读了您的 cmets 后,我曾考虑过使用 coalesce,但在研究了它的性能后,我放弃了这个想法。

最终的解决方案是:对数据帧进行排序并写入 JSON,无需任何重新分区或合并。全部工作完成后,调用下面的HDFS命令

hdfs dfs -getmerge /hdfs/file/path/part* ./local.json

这个命令比我想象的要好得多。它既不需要太多时间,也不需要太多空间,并且给了我一个很好的单个文件。我刚刚在巨大的结果文件上使用了headtail,它看起来完全有序。

【问题讨论】:

  • 不要重新分区。让它创建多个文件,然后一个一个地读取它们,它们应该按照正确的排序顺序。
  • 您可能希望发布 solution 部分作为答案。

标签: apache-spark apache-spark-sql


【解决方案1】:

发生的情况是您重新分区 您的排序操作之后。

repartition 随机重新排列 RDD 中的数据以创建更多或更少的分区并在它们之间进行平衡。这总是对网络上的所有数据进行洗牌。

在底层,它使用coalesceshuffle 重新分配数据。 这就是您的数据不再排序的原因。

您可以查看reference的代码。

【讨论】:

    【解决方案2】:

    由于在您的示例中分区计数设置为 1,这意味着分区减少到 1。

    为了减少 rdd 中的分区数量,spark 提供了一个转换合并(使用 shuffle=false)来保留顺序。

    作为 eliasah,提到了使用合并的引擎盖下的重新分区。它使用 shuffle=true 调用合并。所以可以使用合并转换来代替 shuffle=false 的重新分区。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2019-05-16
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-12-18
      • 2016-01-14
      • 1970-01-01
      相关资源
      最近更新 更多