【问题标题】:Spark: writing DataFrame as compressed JSONSpark:将 DataFrame 编写为压缩的 JSON
【发布时间】:2015-11-03 03:56:18
【问题描述】:

Apache Spark 的 DataFrameReader.json() 可以自动处理压缩的 JSONlines 文件,但似乎没有办法让 DataFrameWriter.json() 编写压缩的 JSONlines 文件。额外的网络 I/O 在云中非常昂贵。

有没有办法解决这个问题?

【问题讨论】:

  • 你找到压缩 json 输出的方法了吗?我也在寻找解决方案。
  • 我还没有找到方法。

标签: apache-spark compression gzip dataframe apache-spark-sql


【解决方案1】:

使用 Spark 2.X(可能更早,我没有测试)有一种更简单的方法来编写压缩 JSON,不需要更改配置:

val df: DataFrame = ...
df.write.option("compression", "gzip").json("/foo/bar")

这也适用于 CSV 和 Parquet,只需在设置压缩选项后使用 .csv() 和 .parquet() 而不是 .json() 来写入文件。

可能的编解码器有:none、bzip2、deflate、gzip、lz4 和 snappy。

【讨论】:

  • 它似乎不适用于 Spark 2.X 之前的版本
【解决方案2】:

以下解决方案使用 pyspark,但我假设 Scala 中的代码类似。

第一个选项是在初始化 SparkConf 时设置以下内容:

conf = SparkConf()
conf.set("spark.hadoop.mapred.output.compress", "true")
conf.set("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec")
conf.set("spark.hadoop.mapred.output.compression.type", "BLOCK")

使用上面的代码,您使用 sparkContext 生成的任何文件都会使用 gzip 自动压缩。

第二个选项,如果您只想压缩上下文中的选定文件。假设“df”是您的数据框和文件名您的目的地:

df_rdd = self.df.toJSON() 
df_rdd.saveAsTextFile(filename,compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec")

【讨论】:

  • Scala RDD API 是def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]),所以代码类应该直接传递而不是作为字符串传递。
  • 想知道在将数据存储到文件时是否可以避免使用 hadoopish 格式。我不能使用带有_SUCCESpart-* 文件的目录。我只需要一个特定命名的单个文件...
  • 抱歉复活了,但我很难相信conf.set("spark.hadoop.mapred.output.compression.codec", "true")是必要的
  • DataFrame 不是 RDD。全局更改压缩设置使其隐含也不是一个好习惯。
【解决方案3】:

SparkConf 上设置压缩选项不是一个好的做法,作为公认的答案。它全局改变了行为,而不是基于每个文件指示设置。事实是,显式总是优于隐式。在某些情况下,用户无法轻松操作上下文配置,例如 spark-shell 或设计为另一个子模块的代码。

正确的方法

自 Spark 1.4 起支持使用压缩写入 DataFrame。实现这一目标的几种方法:

一个

df.write.json("filename.json", compression="gzip")

就是这样!随意使用DataFrameWriter.json()

神奇隐藏在代码pyspark/sql/readwriter.py

@since(1.4)
def json(self, path, mode=None, compression=None, dateFormat=None, timestampFormat=None):
    """Saves the content of the :class:`DataFrame` in JSON format
    (`JSON Lines text format or newline-delimited JSON <http://jsonlines.org/>`_) at the
    specified path.

    :param path: the path in any Hadoop supported file system
    :param mode: ...

    :param compression: compression codec to use when saving to file. This can be one of the
                        known case-insensitive shorten names (none, bzip2, gzip, lz4,
                        snappy and deflate).
    :param dateFormat: ...
    :param timestampFormat: ...

    >>> df.write.json(os.path.join(tempfile.mkdtemp(), 'data'))
    """
    self.mode(mode)
    self._set_opts(
        compression=compression, dateFormat=dateFormat, timestampFormat=timestampFormat)
    self._jwrite.json(path)

支持的压缩格式有 bzip2、gzip、lz4、snappy 和 deflate,不区分大小写。

scala API 应该是一样的。

另一个

df.write.options(compression="gzip").json("filename.json")

同上。可以提供更多选项作为关键字参数。从 Spark 1.4 开始可用。

第三

df.write.option("compression", "gzip").json("filename.json")

DataFrameWriter.option() 从 Spark 1.5 开始添加。一次只能添加一个参数。

【讨论】:

    猜你喜欢
    • 2018-01-09
    • 1970-01-01
    • 2019-02-19
    • 2021-11-30
    • 2013-09-23
    • 2019-03-17
    • 1970-01-01
    • 2017-03-03
    • 2013-06-18
    相关资源
    最近更新 更多