【发布时间】:2017-07-20 20:43:44
【问题描述】:
我的数据原则上是一个表,除了其他“数据”之外,它还包含一个列ID 和一个列GROUP_ID。
在第一步中,我将 CSV 读入 Spark,进行一些处理以准备第二步的数据,并将数据写入 parquet。
第二步做了很多groupBy('GROUP_ID')和Window.partitionBy('GROUP_ID').orderBy('ID')。
现在的目标是——为了避免在第二步中进行洗牌——在第一步中有效地加载数据,因为这是一个一次性计时器。
问题第 1 部分: AFAIK,Spark 在从 parquet 加载时保留分区(这实际上是任何“优化写入考虑”的基础) - 正确吗?
我想出了三种可能性:
df.orderBy('ID').write.partitionBy('TRIP_ID').parquet('/path/to/parquet')df.orderBy('ID').repartition(n, 'TRIP_ID').write.parquet('/path/to/parquet')df.repartition(n, 'TRIP_ID').sortWithinPartitions('ID').write.parquet('/path/to/parquet')
我会设置 n 以使各个 parquet 文件约为 100MB。
问题第 2 部分:三个选项在目标方面产生“相同”/相似的结果是否正确(避免在第二步中改组)?如果不是,有什么区别?哪个“更好”?
问题第 3 部分:三个选项中哪一个在第 1 步中表现更好?
感谢您分享您的知识!
编辑 2017-07-24
在进行了一些测试(写入和读取 parquet)之后,Spark 在第二步中似乎无法默认恢复 partitionBy 和 orderBy 信息。分区数(从df.rdd.getNumPartitions() 获得似乎取决于核心数和/或spark.default.parallelism(如果设置),而不是拼花分区数。所以回答问题 1 将是错误,问题 2 和 3 将无关紧要。
所以事实证明真正的问题是:有没有办法告诉 Spark,数据已经按列 X 分区并按列 排序是?
【问题讨论】:
-
如果您使用
df.repartition(n, 'TRIP_ID').write.partitionBy('TRIP_ID').parquet('/path/to/parquet'),它将创建 n 个镶木地板文件。如果现在您的spark.default.parallelism和spark.sql.shuffle.partitions等于 n 它将避免一个随机播放阶段,但是您使用此数据集。如果您在TRIP_ID上有过滤子句,则谓词下推将只读取该文件。 -
看看这个讲座youtu.be/99fYi2mopbs你可能会得到一些有见地的提示
标签: apache-spark pyspark apache-spark-sql pyspark-sql