【发布时间】:2021-02-15 11:03:27
【问题描述】:
我的数据集有 20000 个文件,每个文件都很小。 我将如何减少文件数量以及最佳数量是多少?
【问题讨论】:
标签: palantir-foundry foundry-code-repositories
我的数据集有 20000 个文件,每个文件都很小。 我将如何减少文件数量以及最佳数量是多少?
【问题讨论】:
标签: palantir-foundry foundry-code-repositories
执行此操作的最直接方法是在转换结束时显式执行repartition()(或coalesce(),如果分区数从原始数量严格减少)。
这需要是您返回/写出结果之前的最后一次调用。
这看起来像:
# ...
@transform_df(
# ... inputs
)
def my_compute_function(my_inputs):
# ... my transform logic ...
df = df.coalesce(500)
# df = df.repartition(500) # this also works but is slightly slower than coalesce
return df
这是称为“分桶”的前导步骤,以供参考。
最佳存储桶数量取决于您操作的数据规模。通过在成功构建后观察磁盘上数据集的总大小来计算最佳存储桶数有点简单。
如果您的数据集大小为 128GB,您最终会希望得到 128MB 的文件,因此您的存储桶数为:
128 GB * (1000 MB / 1 GB) * (1 file / 128MB) -> 1000 files
注意:这不是一个精确的计算,因为由于 Snappy + Parquet 写出中使用的数据压缩,更改存储桶计数后您的最终数据集大小会有所不同。您会注意到文件大小与您预期的略有不同,因此上述示例中最终可能需要 1100 或 900 个文件
【讨论】:
由于这是一个我必须解决很多次的问题,因此我决定编写一份更详细的指南,其中包含一系列不同的技术、优缺点以及存在的理由。
避免使用包含许多文件的数据集有几个很好的理由:
最终得到包含许多文件的数据集通常是由以下三个原因之一引起的:
groupBy 时(这意味着洗牌),spark 默认选择将数据重新分区到 200 个新分区中,这对于 e.g. 来说太多了。增量变换。由于分区错误(下文讨论),转换也可能产生过多的输出文件。接下来,我将列出我知道的所有减少数据集中文件数的方法,以及它们的优缺点,以及适用时的一些特征。
最好的选择之一是首先避免有很多文件。从例如摄取许多文件时类似文件系统的源,像“连接转换器”这样的 magritte 转换器可能有助于将许多 CSV、JSON 或 XML 文件组合成一个文件。在适用时,连接然后应用 gzip 转换器是一种特别有效的策略,因为它通常会将 XML 和类似文本格式的大小减少 94% 左右。
主要的限制是要应用这个,你需要
也可以将许多文件压缩成更少的文件(使用 .tar.bz2、.tar.gz、.zip、.rar 等格式),但这随后需要知道的下游转换这种文件格式并手动解压缩它(文档中提供了一个示例),因为代工厂无法透明地提供这些档案中的数据。然而,没有预制的 magritte 处理器可以做到这一点,并且在我应用这种技术的情况下,我使用 bash 脚本在摄取之前执行此任务,这无疑不太理想。
Foundry 中有一种新机制,可以将您写入的数据集与读取的数据集分离。本质上,有一个后台作业正在运行,它在您附加文件时将文件打乱到优化的索引中,因此数据集的读取(大部分)可以转到这个优化的索引,而不是作者留下的(通常有些随意的)数据布局。
这有很多好处(例如自动生成针对最常见读取模式优化的数据布局),其中之一是它可以在后台“压缩”您的数据集。
从这样的数据集读取时,您的读取基本上会命中索引以及输入数据集(其中包含尚未被后台进程合并到索引中的任何文件。)
最大的优势是这会在后台自动发生,无论您的数据摄取或转换有多混乱,您都可以简单地写出数据(写入时不会受到任何性能影响,并尽快将数据提供给消费者)虽然最终仍然会得到一个分区良好且文件很少的数据集(最终)。
这里的主要限制是这仅适用于 spark 可以本机理解的格式的数据集,例如 parquet、avro、json、csv ......如果你有例如摄取任意文件,解决方法可以是将它们打包到例如摄入前的镶木地板。这样一来,Foundry 仍然可以随着时间的推移合并多个 Parquet 文件。
此功能尚不完全可供最终用户使用(但计划默认为所有内容启用。)如果您认为这是您的管道之一最理想的解决方案,您的 palantir POC 可以启动工单与团队一起启用此功能。
Coalescing 是 spark 中的一种操作,可以减少分区数量而不会产生广泛的依赖关系(spark 中唯一的这种操作)。合并很快,因为它最大限度地减少了洗牌。它的工作原理与以前的 spark 版本相比发生了变化(并且有很多相互矛盾的信息),但它通常比 repartition 更快。但是,它有一个很大的警告:它会降低整个转换的并行度。
即使您在写入数据之前的最后coalesce,spark 也会调整整个查询计划以在整个过程中使用更少的分区,从而减少使用的执行程序,这意味着您得到的更少并行性。
重新分区类似,但它插入了一个完整的洗牌阶段。这会带来更高的性能成本,但这意味着从这个阶段出来的数据基本上可以保证是良好分区的(不管输入)。虽然 repartition 本身有点昂贵,但它不会受到整个转换过程中降低并行度的问题的影响。
这意味着总体而言,如果您最终写出的数据量与您之前所做的工作量相比并没有那么大,那么使用repartition 而不是coalesce,您通常会获得更好的性能,如在更多执行者上处理数据的能力最终超过了洗牌的缺点。根据我的经验,repartition 通常会在这里胜出,除非您的转换非常简单。
一个值得讨论的特定用例是增量管道。如果您的增量管道相对简单,并且仅适用于例如映射和过滤,然后执行coalesce 就可以了。然而,许多增量管道也会读取非常大数据集的快照视图。例如,增量管道可能会接收一行新数据,并读取整个先前的输出数据集(可能数百万行),因此查看输出数据集中是否已存在该行。如果已经存在,则不发出任何行,如果不存在,则追加该行。将一小部分增量数据与大型静态数据集等相结合时会发生类似情况。
在这种情况下,转换是增量的,但它仍然受益于高并行性,因为它仍然处理大量数据。
我的粗略指导是:
repartition 到一个合理的数字coalesce(1)
repartition(1)
如果写入速度/流水线延迟非常重要,则这些选项都不可接受。在这种情况下,我会考虑背景压缩。
作为前一点的扩展,为了保持增量管道的高性能,我喜欢在它们上安排定期快照,这允许我每隔一段时间重新分区数据集,执行基本上是“压缩”的操作。
我已经描述了如何在此处进行设置的机制:How to force an incremental Foundry Transforms job to build non-incrementally without bumping the semantic version?
我通常会在例如周末。在一周内,管道中的每个数据集(可能有数百个数据集)将累积数千或数万个事务和文件。然后在周末,随着计划的快照在管道中滚动,每个数据集将被重新分区为一百个文件。
最近,AQE 在代工厂中可用。 AQE 本质上(出于本次讨论的目的)将coalesce 操作注入到您已经进行随机操作的阶段,具体取决于先前操作的结果。这通常会改善分区(从而提高文件计数),但据报道在极少数情况下也会使情况变得更糟(但我自己没有观察到这一点)。
AQE 默认启用,但如果您想尝试禁用它,可以将 spark 配置文件应用于您的转换。
Bucketing 和 partitioning 与此讨论有些相切,因为它们主要是关于布置数据以优化读取数据的特定方法。目前,这些技术都不适用于增量管道。
一个常见的错误是写出由高基数列(例如时间戳)分区的数据集。在具有 1000 万个唯一时间戳的数据集中,这将导致(至少)1000 万个文件出现在输出数据集中。
在这些情况下,应修复转换,并应通过应用保留删除旧事务(包含数百万个文件)。
其他压缩数据集的技巧也是可行的,例如创建“环回”转换来读取先前的输出并将其重新分区,或者手动打开数据集上的事务以重写它。
这些非常hacky,但在我看来是不可取的,应该避免。如今,背景压缩主要以一种更优雅、更可靠且更简洁的方式解决了这个问题。
【讨论】: