【发布时间】:2020-01-23 19:02:39
【问题描述】:
pyspark 初学者在这里 - 我有一个 spark 数据框,其中每一行都是 s3 上的 url。 每个 url 都是 JSON 数组的 GZIP 文件,我可以将数据框中的每一行(链接)解析为 python 列表,但我不知道如何从这个 JSON 列表中创建多行。
这是我使用的返回 json 列表的函数:
def distributed_read_file(url):
s3_client = boto3.client('s3')
result = s3_client.get_object(Bucket=raw_data_bucket_name, Key=url)
bytestream = BytesIO(result['Body'].read())
string_json = GzipFile(None, 'rb', fileobj=bytestream).read().decode('utf-8')
list_of_jsons = json.loads(string_json)
例如,如果这些是列表中的 JSON 对象:
[{"a": 99, "b": 102}, {"a": 43, "b": 87}]
我想在 URLS 数据帧上运行一个函数,例如:
result_df = urls_rdd.map(distributed_read_file)
并获得包含以下列的数据框:a 和 b(JSON 键)。 当我尝试这样做时,我将每个 json 对象作为 MapType 列取回,我很难使用它。
非常感谢,我希望它清楚!
【问题讨论】:
-
你能显示你得到的当前输出吗?