【问题标题】:Efficiently Aggregate Many CSVs in Spark在 Spark 中高效聚合多个 CSV
【发布时间】:2023-03-26 04:58:02
【问题描述】:

请原谅我的简单问题,但我对 Spark/Hadoop 比较陌生。

我正在尝试将一堆小的 CSV 文件加载到 Apache Spark 中。它们目前存储在 S3 中,但如果这样可以简化操作,我可以在本地下载它们。我的目标是尽可能有效地做到这一点。让一些单线程主机下载和解析一堆 CSV 文件,而我的几十个 Spark 工作人员却无所事事,这似乎是一种耻辱。我希望有一种惯用的方式来分发这项工作。

CSV 文件按如下目录结构排列:

2014/01-01/fileabcd.csv
2014/01-01/filedefg.csv
...

我有两年的数据,每天都有目录,每个目录里面都有几百个 CSV。所有这些 CSV 都应该具有相同的架构,但当然有可能一个 CSV 有问题,如果有几个有问题的文件,我不希望整个工作崩溃。只要我在某处发生的日志中收到通知,就可以跳过这些文件。

似乎我想到的每个 Spark 项目都是这种形式,我不知道如何解决。 (例如,尝试读入一堆制表符分隔的weather data,或者读入一堆日志文件来查看这些文件。)

我的尝试

我已经尝试过 SparkR 和 Scala 库。我真的不在乎我需要使用哪种语言。我对使用正确的习语/工具更感兴趣。

纯 Scala

我最初的想法是枚举和parallelize 的所有year/mm-dd 组合的列表,这样我就可以让我的 Spark 工作人员每天都独立处理(下载并解析所有 CSV 文件,然后将它们堆叠在一起( unionAll()) 减少它们)。不幸的是,使用 spark-csv 库下载和解析 CSV 文件只能在“父”/主作业中完成,而不是从每个子作为 Spark doesn't allow job nesting 完成。因此,只要我想使用 Spark 库进行导入/解析,这将不起作用。

混合语言

当然,您可以使用该语言的原生 CSV 解析来读取每个文件,然后将它们“上传”到 Spark。在 R 中,这是一些包的组合,用于从 S3 中获取文件,然后是 read.csv,最后以 createDataFrame() 将数据导入 Spark。不幸的是,这真的很慢,而且似乎倒退了我希望 Spark 工作的方式。如果我的所有数据在进入 Spark 之前都通过 R 管道传输,为什么还要使用 Spark?

Hive/Sqoop/Phoenix/Pig/Flume/Flume Ng/s3distcp

我已经开始研究这些量身定制的工具,很快就不知所措了。我的理解是,许多/所有这些工具都可用于将我的 CSV 文件从 S3 获取到 HDFS。

当然,从 HDFS 读取我的 CSV 文件会比 S3 更快,这样可以解决部分问题。但是我仍然有数以万计的 CSV 需要解析,并且我不知道在 Spark 中使用分布式方式来执行此操作。

【问题讨论】:

  • 注意:对于日志文件,s3distcp (docs.aws.amazon.com/ElasticMapReduce/latest/DeveloperGuide/…) 有一个--groupBy 选项,允许您在复制时将文件连接在一起。这对于日志文件来说会很好,但对于 CSV 可能不是这样,当您去解析主 CSV 时,您现在正在创建一个全有或全无的场景。还有标题行。最好单独解析和聚合每个 CSV 文件。

标签: csv amazon-s3 apache-spark sparkr


【解决方案1】:

我之前遇到过这个问题(但是会读取大量 Parquet 文件),我的建议是避免使用数据帧并使用 RDD。

常用的成语是:

  1. 读入文件列表,每个文件都是一行(在驱动程序中)。这里的预期输出是一个字符串列表
  2. 并行化字符串列表并使用客户 csv 阅读器映射它们。返回的是案例类别列表。

如果您最终想要一个可以重写为 parquet 或数据库的数据结构,例如 List[weather_data],您也可以使用 flatMap。

【讨论】:

    【解决方案2】:

    所以现在(Spark 1.4)SparkR 支持jsonparquet 文件结构。可以解析 CSV 文件,但随后需要使用额外的 jar 启动 spark 上下文(需要下载并放置在适当的文件夹中,我自己从未这样做过,但我的同事有)。

    sc <- sparkR.init(sparkPackages="com.databricks:spark-csv_2.11:1.0.3")
    sqlContext <- sparkRSQL.init(sc)
    

    the docs 中有更多信息。我希望更新的 spark 版本会对此有更多的支持。

    如果您不这样做,则需要使用不同的文件结构或使用 python 将所有文件从 .csv 转换为 .parquet。这是最近一次 python 演讲中的一个 sn-p。

    data = sc.textFile(s3_paths, 1200).cache()
    
    def caster(x):
        return Row(colname1 = x[0], colname2 = x[1])
    
    df_rdd = data\
        .map(lambda x: x.split(','))\
        .map(caster)
    
    ddf = sqlContext.inferSchema(df_rdd).cache()
    
    ddf.write.save('s3n://<bucket>/<filename>.parquet')
    

    另外,您的数据集有多大?您甚至可能不需要 spark 进行分析。请注意,现在也是;

    • SparkR 仅支持 DataFrame。
    • 还没有分布式机器学习。
    • 对于可视化,如果您想使用像 ggplot2 这样的库,您需要将分布式数据框转换回普通数据框。
    • 如果您的数据集不超过几 GB,那么学习 spark 的额外麻烦可能还不值得
    • 现在还算适中,但您可以期待更多未来

    【讨论】:

    • 谢谢。但是第一种方法不会仍然在主服务器上完成所有下载/解析吗?我在 SparkR 工作中读取了 read.df 的 CSV,但这似乎是一种处理数万个 CSV 文件的低效/串行方式。 CSV -> parquet 方法可能是我必须采用的方法,但不得不对它进行批量预处理似乎是一种耻辱。需要明确的是,这是从工作人员那里并行下载 S3 吗?或者sc.textFile 会连续获取每一个?
    • 哦,我的数据肯定不够大,不足以配得上 Spark。我只是想在“大数据”趋势下成为一项好运动。 :)
    • 请注意sc.textFile 是python 代码,我告诉spark 使用1200 分区。这将在以后“强制”并行,我相信在读取数据时也是如此。不太确定。请注意,sc.textFile 方法接受通配符。 docs
    • 不好意思是密集的,但是在从master串行下载数据之后“强制并行”?还是在下载阶段强制执行?
    • 我认为两者兼而有之。 mailing on subject。我记得玩过 num_partitions 并看到它导致不同的读取时间。尝试确认不会有什么坏处。
    猜你喜欢
    • 2017-04-22
    • 1970-01-01
    • 2019-02-16
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多