【问题标题】:Merge multiple rows of a dataframe into one record将数据帧的多行合并为一条记录
【发布时间】:2020-01-24 01:13:25
【问题描述】:

我需要将 PySpark DataFrame 中的所有行合并到一个列表中,然后添加一个附加属性以批量发送到 API。 这是来自 df 的 json 的样子

{"event_type":"click","visitor_platform":"mobile","visitor_country":"CA","mp_os":"Android", “user_properties”:{“distinct_id”:123,“user_id”:345} }{"event_type":"click","visitor_platform":"mobile","visitor_country":"US","mp_os":"Android", "user_properties":{"distinct_id":321, "user_id":543} }

添加名为load 的额外属性并转换为单个记录列表

{ “负载”:123, “事件”: [ { "event_type":"click","visitor_platform":"mobile","visitor_country":"CA","mp_os":"Android", “user_properties”:{“distinct_id”:123,“user_id”:345} },{"event_type":"click","visitor_platform":"mobile","visitor_country":"US","mp_os":"Android", “user_properties”:{“distinct_id”:321,“user_id”:543} } ] }

【问题讨论】:

  • 到目前为止你尝试了什么?

标签: json pyspark apache-spark-sql


【解决方案1】:

您创建了一个新字典。而在events内,可以调用df.toJSON().collect()

>>> df.show()
+-------+------+-------------------+
|user_id|amount|         trans_date|
+-------+------+-------------------+
|    101| 99.10|2019-06-04 00:00:00|
|    102| 89.27|2019-06-04 00:00:00|
|    102| 89.10|2019-03-04 00:00:00|
|    103| 73.11|2019-09-10 00:00:00|
|    101|-69.81|2019-09-11 00:00:00|
|    101| 12.51|2018-12-14 00:00:00|
|    101| 43.23|2018-09-11 00:00:00|
+-------+------+-------------------+
>>> dict1 = {"load": 123, "events": df.toJSON().collect()}
>>> dict1
{'load': 123, 'events': ['{"user_id":"101","amount":"99.10","trans_date":"2019-06-04T00:00:00.000+05:30"}', '{"user_id":"102","amount":"89.27","trans_date":"2019-06-04T00:00:00.000+05:30"}', '{"user_id":"102","amount":"89.10","trans_date":"2019-03-04T00:00:00.000+05:30"}', '{"user_id":"103","amount":"73.11","trans_date":"2019-09-10T00:00:00.000+05:30"}', '{"user_id":"101","amount":"-69.81","trans_date":"2019-09-11T00:00:00.000+05:30"}', '{"user_id":"101","amount":"12.51","trans_date":"2018-12-14T00:00:00.000+05:30"}', '{"user_id":"101","amount":"43.23","trans_date":"2018-09-11T00:00:00.000+05:30"}']}

如果您不喜欢 JSON 字符串代替 dict 对象,可以使用 json.loads 将其转换为 python dict

>>> import json
>>> dict2 = {"load": 123, "events": [json.loads(x) for x in df.toJSON().collect()]}
{'load': 123, 'events': [{'user_id': '101', 'amount': '99.10', 'trans_date': '2019-06-04T00:00:00.000+05:30'}, {'user_id': '102', 'amount': '89.27', 'trans_date': '2019-06-04T00:00:00.000+05:30'}, {'user_id': '102', 'amount': '89.10', 'trans_date': '2019-03-04T00:00:00.000+05:30'}, {'user_id': '103', 'amount': '73.11', 'trans_date': '2019-09-10T00:00:00.000+05:30'}, {'user_id': '101', 'amount': '-69.81', 'trans_date': '2019-09-11T00:00:00.000+05:30'}, {'user_id': '101', 'amount': '12.51', 'trans_date': '2018-12-14T00:00:00.000+05:30'}, {'user_id': '101', 'amount': '43.23', 'trans_date': '2018-09-11T00:00:00.000+05:30'}]}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2017-02-24
    • 1970-01-01
    • 2021-04-14
    • 1970-01-01
    • 1970-01-01
    • 2017-11-28
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多