【问题标题】:Processing bzipped json file in Spark?在 Spark 中处理 bzip 压缩的 json 文件?
【发布时间】:2015-07-03 18:12:48
【问题描述】:

我在 S3 中有大约 200 个文件,例如 a_file.json.bz2,这些文件的每一行都是 JSON 格式的记录,但有些字段由 pickle.dumps 序列化,例如datetime 字段。 bzip压缩后每个文件大约1GB。现在我需要在 Spark(实际上是 pyspark)中处理这些文件,但我什至无法取出每条记录。那么这里的最佳做法是什么?

ds.take(10) 给了

[(0, u'(I551'),
 (6, u'(dp0'),
 (11, u'Vadv_id'),
 (19, u'p1'),
 (22, u'V479883'),
 (30, u'p2'),
 (33, u'sVcpg_id'),
 (42, u'p3'),
 (45, u'V1913398'),
 (54, u'p4')]

显然不是按每条记录进行拆分的。

谢谢。

【问题讨论】:

    标签: apache-spark pyspark


    【解决方案1】:

    我遇到了这个问题reading gpg-encrypted files。您可以按照 Daniel 的建议使用 wholeTextFiles,但在读取大文件时必须小心,因为整个文件将在处理之前加载到内存中。如果文件太大,可能会使执行程序崩溃。我使用了parallelizeflatMap。也许类似于

    def read_fun_generator(filename):
        with bz2.open(filename, 'rb') as f:
            for line in f:
                yield line.strip()
    
    bz2_filelist = glob.glob("/path/to/files/*.bz2")
    rdd_from_bz2 = sc.parallelize(bz2_filelist).flatMap(read_fun_generator)
    

    【讨论】:

    • 这对我有用,只需稍作修改。我必须 yield pickle.load(f) 代替。
    【解决方案2】:

    您可以通过SparkContext.wholeTextFiles 逐个文件(而不是逐行)访问输入文件。然后您可以使用flatMap 解压缩并解析您自己代码中的行。

    【讨论】:

      【解决方案3】:

      其实是pickle引起的问题。通过查看压缩后的文件内容,确实是

      (I551
      (dp0
      Vadv_id
      p1
      V479883
      p2
      sVcpg_id
      p3
      V1913398
      p4
      

      这让我难以解析。我知道我可以 pick.load(file) 多次取出对象,但在 Spark 中找不到快速解决方案,我只能逐行访问加载的文件。此外,此文件中的记录具有可变的字段和长度,这使得破解更加困难。

      我最终从源代码重新生成了这些bz2 文件,因为它实际上更容易、更快捷。而且我了解到 Spark 和 hadoop 完美支持bz2 压缩,因此无需额外操作。

      【讨论】:

        猜你喜欢
        • 2017-05-01
        • 1970-01-01
        • 2012-09-01
        • 2011-06-14
        • 2014-05-13
        • 2012-06-21
        • 2021-06-19
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多