【发布时间】:2019-02-18 00:19:38
【问题描述】:
我有一个看起来像这样的数据框:
>>> l = [('a', 'foo', 1), ('b', 'bar', 1), ('a', 'biz', 6), ('c', 'bar', 3), ('c', 'biz', 2)]
>>> df = spark.createDataFrame(l, ('uid', 'code', 'level'))
>>> df.show()
+---+----+-----+
|uid|code|level|
+---+----+-----+
| a| foo| 1|
| b| bar| 1|
| a| biz| 6|
| c| bar| 3|
| c| biz| 2|
+---+----+-----+
我要做的是将code 和level 值分组到list 或dict 中,并将该列表转储为JSON 字符串,以便我可以将数据帧保存到磁盘。结果如下:
>>> df.show()
+---+--------------------------+
|uid| json |
+---+--------------------------+
| a| '[{"foo":1}, {"biz":6}]' |
| b| '[{"bar":1}]' |
| c| '[{"bar":3}, {"biz":2}]' |
+---+--------------------------+
我对使用 PySpark 还是很陌生,我在弄清楚如何获得这个结果时遇到了很多麻烦。我几乎肯定需要一个groupBy,我已经尝试通过创建一个名为“json”的新StringType 列然后使用pandas_udf 装饰器来实现这一点,但是我遇到了关于无法使用的类型的错误,因为我我发现,我访问数据的方式是访问整列,而不仅仅是行。
>>> df = df.withColumn('json', F.list(''))
>>> schema = df.schema
>>> @pandas_udf(schema, F.PandasUDFType.GROUPED_MAP)
..: def to_json(pdf):
..: return pdf.assign(serial=json.dumps({pdf.code:pdf.level}))
我考虑过在两列之间使用字符串连接并使用collect_set,但这也感觉不对,因为它有可能将无法加载 JSON 的内容写入磁盘,因为它具有字符串表示形式。任何帮助表示赞赏。
【问题讨论】: