【问题标题】:_corrupt_record error when reading a JSON file into Spark将 JSON 文件读入 Spark 时出现 _corrupt_record 错误
【发布时间】:2016-05-26 08:57:37
【问题描述】:

我有这个 JSON 文件

{
    "a": 1, 
    "b": 2
}

通过Python json.dump 方法获得。 现在,我想使用 pyspark 将此文件读入 Spark 中的 DataFrame。按照文档,我正在这样做

sc = SparkContext()

sqlc = SQLContext(sc)

df = sqlc.read.json('my_file.json')

打印 df.show()

虽然打印语句吐出了这个:

+---------------+
|_corrupt_record|
+---------------+
|              {|
|       "a": 1, |
|         "b": 2|
|              }|
+---------------+

任何人都知道发生了什么以及为什么它没有正确解释文件?

【问题讨论】:

    标签: python json dataframe pyspark


    【解决方案1】:

    如果您想保持 JSON 文件原样(不删除换行符 \n),请包含 multiLine=True 关键字参数

    sc = SparkContext() 
    sqlc = SQLContext(sc)
    
    df = sqlc.read.json('my_file.json', multiLine=True)
    
    print df.show()
    

    【讨论】:

      【解决方案2】:

      您需要在输入文件中的每一行有一个 json 对象,请参阅http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader.json

      如果您的 json 文件看起来像这样,它将为您提供预期的数据帧:

      { "a": 1, "b": 2 }
      { "a": 3, "b": 4 }
      
      ....
      df.show()
      +---+---+
      |  a|  b|
      +---+---+
      |  1|  2|
      |  3|  4|
      +---+---+
      

      【讨论】:

      • 如果我的 JSON 文件很大(有 100K 行)并且记录之间有很多新行(列或特征),我该如何解决?谢谢。
      • 也许使用jq 重新格式化(压缩)文件? @M.Rez
      • @ttimasdf 即使用jq -c 选项。
      【解决方案3】:

      在 Spark 2.2+ 中,您可以使用以下命令读取多行的 json 文件。

      val dataframe = spark.read.option("multiline",true).json( " filePath ")
      

      如果每行有json对象,则

      val dataframe = spark.read.json(filepath)
      

      【讨论】:

      • 这是 scala,不是 python。
      【解决方案4】:

      添加到@Bernhard 的精彩回答

      # original file was written with pretty-print inside a list
      with open("pretty-printed.json") as jsonfile:
          js = json.load(jsonfile)      
      
      # write a new file with one object per line
      with open("flattened.json", 'a') as outfile:
          for d in js:
              json.dump(d, outfile)
              outfile.write('\n')
      

      【讨论】:

        【解决方案5】:

        我想分享我的经验,我有一个 JSON 列 String 但使用 Python 表示法,这意味着我有 None 而不是 nullFalse 而不是 falseTrue 而不是 @ 987654326@.

        解析此列时,spark 会返回一个名为 _corrupt_record 的列。所以在解析 JSON 字符串之前我必须做的是用标准 JSON 表示法替换 Python 表示法:

        df.withColumn("json_notation",
            F.regexp_replace(F.regexp_replace(F.regexp_replace("_corrupt_record", "None", "null"), "False", "false") ,"True", "true")
        

        经过此转换后,我可以在 json_notation 列上使用函数 F.from_json(),并且 Pyspark 能够正确解析 JSON 对象。

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 1970-01-01
          • 2019-08-13
          • 2022-01-13
          • 1970-01-01
          • 1970-01-01
          • 2023-03-03
          • 1970-01-01
          • 2021-07-01
          相关资源
          最近更新 更多