【发布时间】:2016-06-07 05:02:14
【问题描述】:
我正在尝试读取多个 JSON 来创建一个 DataFrame。
我将多个 JSON 文件放在一个 PythonRDD 中,然后当我尝试转换为 DataFrame 时它失败了。要么我使用toDF() 或sqlContext.createDataFrame() 方法,我得到以下错误:
ValueError: Some of types cannot be determined by the first 100 rows, please try again with sampling
这很奇怪,因为使用 sqlContext.read.json() 可以正常工作。
这是我的代码:
import json
from pyspark.sql import Row
def dict_to_row(obj):
if isinstance(obj, dict) and len(obj.values())>0:
d = {}
for k in obj.keys():
d[k] = dict_to_row(obj[k])
return Row(**d)
elif isinstance(obj, list):
return [dict_to_row(o) for o in obj]
else:
return obj
def distributed_json_read(filename):
jsons = open(filename,'r')
json_list = jsons.readlines()
for e in json_list:
json_row = json.loads(e.rstrip())
yield dict_to_row(json_row)
json_list = ['test1.json','test2.json']
parallel_keys = sc.parallelize(json_list)
data_rdd = parallel_keys.flatMap(distributed_json_read)
df = sqlContext.createDataFrame(data_rdd)
这里有一个 test1.json 的例子:
{
"data": {
"f": {
"a": {
"a1": 100,
"a2": 1
},
"b": [
{
"b1": {
"b11": 1,
"b12": null
},
"date1": "2016-02-05T01:58:04.000-0400",
"b2": {
"b21": null,
"b22": "9ca6d130fddb",
"b23": false
}
}
]
}
},
"id": 1689
}
有人遇到过这个错误吗?
实际上我的目标是读取多个 JSONs 文件,这些文件可以具有不同的模式,但最后构建一个 DataFrame,其模式将是 JSONs 模式的联合。如果参数是包含多个 JSON 的文件,则类似于使用 sqlContext.read.json() 可以实现的效果。
【问题讨论】:
-
我有点困惑。为什么要创建 Rows 而不是直接读取 JSON?
-
嗨@zero323,我的最终目标是从不同的 S3 存储桶中读取 JSON。我想并行阅读它们,这就是为什么我无法使用 read.json() 函数
标签: apache-spark pyspark apache-spark-sql