【问题标题】:How to convert PythonRDD (of lines in JSONs) to DataFrame?如何将 PythonRDD(JSON 中的行)转换为 DataFrame?
【发布时间】:2016-06-07 05:02:14
【问题描述】:

我正在尝试读取多个 JSON 来创建一个 DataFrame。

我将多个 JSON 文件放在一个 PythonRDD 中,然后当我尝试转换为 DataFrame 时它失败了。要么我使用toDF()sqlContext.createDataFrame() 方法,我得到以下错误:

ValueError: Some of types cannot be determined by the first 100 rows, please try again with sampling

这很奇怪,因为使用 sqlContext.read.json() 可以正常工作。

这是我的代码:

import json
from pyspark.sql import Row

def dict_to_row(obj):
    if isinstance(obj, dict) and len(obj.values())>0:
        d = {}
        for k in obj.keys():
            d[k] = dict_to_row(obj[k])
            return Row(**d)
    elif isinstance(obj, list):
        return [dict_to_row(o) for o in obj]
    else:
        return obj

def distributed_json_read(filename):
    jsons = open(filename,'r')
    json_list = jsons.readlines()
    for e in json_list:
        json_row = json.loads(e.rstrip())
        yield dict_to_row(json_row)

json_list = ['test1.json','test2.json']
parallel_keys = sc.parallelize(json_list)
data_rdd = parallel_keys.flatMap(distributed_json_read)
df = sqlContext.createDataFrame(data_rdd)

这里有一个 test1.json 的例子:

{
    "data": {
        "f": {
            "a": {
                "a1": 100,
                "a2": 1
            },
            "b": [
                {
                    "b1": {
                        "b11": 1,
                        "b12": null
                    },
                    "date1": "2016-02-05T01:58:04.000-0400",
                    "b2": {
                        "b21": null,
                        "b22": "9ca6d130fddb",
                        "b23": false
                    }
                }
            ]
        }
    },
    "id": 1689
}

有人遇到过这个错误吗?

实际上我的目标是读取多个 JSONs 文件,这些文件可以具有不同的模式,但最后构建一个 DataFrame,其模式将是 JSONs 模式的联合。如果参数是包含多个 JSON 的文件,则类似于使用 sqlContext.read.json() 可以实现的效果。

【问题讨论】:

  • 我有点困惑。为什么要创建 Rows 而不是直接读取 JSON?
  • 嗨@zero323,我的最终目标是从不同的 S3 存储桶中读取 JSON。我想并行阅读它们,这就是为什么我无法使用 read.json() 函数

标签: apache-spark pyspark apache-spark-sql


【解决方案1】:

我之前为 spark 编写了一个自定义 json 阅读器。我在包含 json 文件的文件夹上使用了 sc.wholeTextFiles() 或 sc.binaryFiles()。

这将为您提供 (file_url, wholeFile/BinaryFile) 的 rdd (k,v) 然后你可以在那个rdd上应用你的平面地图


rdd = sc.wholeTextFiles("super_folder_containing_jsons")
data_rdd = rdd.flatMap(distributed_json_read)
df = sqlContext.createDataFrame(data_rdd)

【讨论】:

  • 我不确定您的解决方案是否有效,因为如果您在 flatMap 中使用函数(例如),您将无法访问 sc (SparkContext)。
  • 是的,同意,并且你不需要在你的distributed_json_read或dict_to_row中使用sc,这个解决方案。
【解决方案2】:

您要么为数据框提供静态架构(所有类型的超集),要么为代码提供第一个包含所有字段的 json,这将有助于拥有默认架构。

当您没有默认架构并且您提供 json(具有较少字段)时的一个问题,它可能在稍后读取具有新字段的新 json 文件时出现问题。

【讨论】:

  • 我事先知道全局模式,但似乎当 Spark 尝试构建最终 RDD 时,如果 2 个或更多 JSON 没有相同的模式。
【解决方案3】:

Spark 中的 JSON 必须是单行的,即单个 JSON 文件应该是一行。

scala> final case class Token(id: Int, body: String)
defined class Token

scala> val df = spark.createDataset(Seq(Token(0, "hello"), Token(1, "world")))
df: org.apache.spark.sql.Dataset[Token] = [id: int, body: string]

scala> df.show
+---+-----+
| id| body|
+---+-----+
|  0|hello|
|  1|world|
+---+-----+

scala> df.write.json("so.json")

// $ cat so.json/part-r-00003-469964b4-aaf8-4c7a-8f8a-d76c08e792ce.json
// {"id":0,"body":"hello"}

【讨论】:

  • 实际上 JSON 是单行的,我只是把漂亮的打印出来。
猜你喜欢
  • 1970-01-01
  • 2022-01-22
  • 1970-01-01
  • 2018-03-30
  • 1970-01-01
  • 2017-04-13
  • 2021-03-29
  • 1970-01-01
相关资源
最近更新 更多