【问题标题】:using wholeTextFiles in pyspark but get the error of out of memory在pyspark中使用wholeTextFiles但得到内存不足的错误
【发布时间】:2016-03-31 07:33:21
【问题描述】:

我有一些文件(part-00000.gz、part-00001.gz、part-00002.gz、...),每个部分都相当大。我需要使用每个部分的文件名,因为它包含时间戳信息。据我所知,在 pyspark 中似乎只有 wholeTextFiles 可以将输入读取为(文件名,内容)。但是,使用 wholeTextFiles 时出现内存不足的错误。所以,我的猜测是,wholeTextFiles 在没有分区操作的情况下读取整个部分作为映射器中的内容。我也找到了这个答案(How does the number of partitions affect `wholeTextFiles` and `textFiles`?)。如果是这样,我怎样才能得到一个相当大的部分文件的文件名。谢谢

【问题讨论】:

    标签: python apache-spark pyspark


    【解决方案1】:

    您收到错误是因为wholeTextFiles 尝试将整个文件读入单个 RDD。您最好逐行阅读文件,只需编写自己的生成器并使用flatMap 函数即可。这是一个 example 这样做以读取 gzip 文件:

    import gzip
    def read_fun_generator(filename):
        with gzip.open(filename, 'rb') as f:
            for line in f:
                yield line.strip()
    
    gz_filelist = glob.glob("/path/to/files/*.gz")
    rdd_from_bz2 = sc.parallelize(gz_filelist).flatMap(read_fun_generator)
    

    【讨论】:

    • 我在亚马逊 s3 工作。 glob.glob 有效吗?您的答案似乎也读取了给定文件名(part-00000)的文件的所有行。我应该用 sc.textfile 替换 bz2.open
    • 我不仅要使用文件名,还要使用内容。看来您的回答仍然是逐行读取 RDD 中的整个部分文件。
    • 您必须调整此代码才能完全按照您的意愿行事。 glob 命令其实只是获取文件名列表,保存到bz2_filelist。这个想法是创建一个文件名的 RDD(这是 parallelize 所做的),然后为每个文件名读取该文件中的每一行。请注意,您可以访问此生成器中的文件名。例如,如果你想在每一行添加文件名,你可以使用yield filename + "|" + line.strip()
    猜你喜欢
    • 2021-10-05
    • 1970-01-01
    • 2016-07-12
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-05-13
    • 2012-08-14
    相关资源
    最近更新 更多