【发布时间】:2023-03-26 04:58:02
【问题描述】:
请原谅我的简单问题,但我对 Spark/Hadoop 比较陌生。
我正在尝试将一堆小的 CSV 文件加载到 Apache Spark 中。它们目前存储在 S3 中,但如果这样可以简化操作,我可以在本地下载它们。我的目标是尽可能有效地做到这一点。让一些单线程主机下载和解析一堆 CSV 文件,而我的几十个 Spark 工作人员却无所事事,这似乎是一种耻辱。我希望有一种惯用的方式来分发这项工作。
CSV 文件按如下目录结构排列:
2014/01-01/fileabcd.csv
2014/01-01/filedefg.csv
...
我有两年的数据,每天都有目录,每个目录里面都有几百个 CSV。所有这些 CSV 都应该具有相同的架构,但当然有可能一个 CSV 有问题,如果有几个有问题的文件,我不希望整个工作崩溃。只要我在某处发生的日志中收到通知,就可以跳过这些文件。
似乎我想到的每个 Spark 项目都是这种形式,我不知道如何解决。 (例如,尝试读入一堆制表符分隔的weather data,或者读入一堆日志文件来查看这些文件。)
我的尝试
我已经尝试过 SparkR 和 Scala 库。我真的不在乎我需要使用哪种语言。我对使用正确的习语/工具更感兴趣。
纯 Scala
我最初的想法是枚举和parallelize 的所有year/mm-dd 组合的列表,这样我就可以让我的 Spark 工作人员每天都独立处理(下载并解析所有 CSV 文件,然后将它们堆叠在一起( unionAll()) 减少它们)。不幸的是,使用 spark-csv 库下载和解析 CSV 文件只能在“父”/主作业中完成,而不是从每个子作为 Spark doesn't allow job nesting 完成。因此,只要我想使用 Spark 库进行导入/解析,这将不起作用。
混合语言
当然,您可以使用该语言的原生 CSV 解析来读取每个文件,然后将它们“上传”到 Spark。在 R 中,这是一些包的组合,用于从 S3 中获取文件,然后是 read.csv,最后以 createDataFrame() 将数据导入 Spark。不幸的是,这真的很慢,而且似乎倒退了我希望 Spark 工作的方式。如果我的所有数据在进入 Spark 之前都通过 R 管道传输,为什么还要使用 Spark?
Hive/Sqoop/Phoenix/Pig/Flume/Flume Ng/s3distcp
我已经开始研究这些量身定制的工具,很快就不知所措了。我的理解是,许多/所有这些工具都可用于将我的 CSV 文件从 S3 获取到 HDFS。
当然,从 HDFS 读取我的 CSV 文件会比 S3 更快,这样可以解决部分问题。但是我仍然有数以万计的 CSV 需要解析,并且我不知道在 Spark 中使用分布式方式来执行此操作。
【问题讨论】:
-
注意:对于日志文件,s3distcp (docs.aws.amazon.com/ElasticMapReduce/latest/DeveloperGuide/…) 有一个
--groupBy选项,允许您在复制时将文件连接在一起。这对于日志文件来说会很好,但对于 CSV 可能不是这样,当您去解析主 CSV 时,您现在正在创建一个全有或全无的场景。还有标题行。最好单独解析和聚合每个 CSV 文件。
标签: csv amazon-s3 apache-spark sparkr