【问题标题】:PySpark - RDD to JSONPySpark - RDD 到 JSON
【发布时间】:2018-06-25 14:57:39
【问题描述】:

我有一个 Hive 查询,它以这种格式返回数据:

ip, category, score
1.2.3.4, X, 5
10.10.10.10, A, 2
1.2.3.4, Y, 2
12.12.12.12, G, 10
1.2.3.4, Z, 9
10.10.10.10, X, 3

在 PySpark 中,我通过 hive_context.sql(my_query).rdd 获得此信息

每个 ip 地址可以有多个分数(因此有多个行)。我想以 json/array 格式获取这些数据,如下所示:

{
    "ip": "1.2.3.4",
    "scores": [
        {
            "category": "X",
             "score": 10
        },
        {
            "category": "Y",
             "score": 2
        },
        {
            "category": "Z",
             "score": 9
        },
    ],
    "ip": "10.10.10.10",
    "scores": [
        {
            "category": "A",
             "score": 2
        },
        {
            "category": "X",
             "score": 3
        },
    ],
     "ip": "12.12.12.12",
    "scores": [
        {
            "category": "G",
             "score": 10
        },
    ],
}

请注意,RDD 不一定是排序的,RDD 很容易包含几亿行。我是 PySpark 的新手,所以任何关于如何有效进行此操作的指示都会有所帮助。

【问题讨论】:

    标签: arrays json pyspark


    【解决方案1】:

    groupByip 然后将分组的 RDD 转换为您需要的:

    rdd.groupBy(lambda r: r.ip).map(
      lambda g: {
        'ip': g[0], 
        'scores': [{'category': x['category'], 'score': x['score']} for x in g[1]]}
    ).collect()
    
    # [{'ip': '1.2.3.4', 'scores': [{'category': 'X', 'score': 5}, {'category': 'Y', 'score': 2}, {'category': 'Z', 'score': 9}]}, {'ip': '12.12.12.12', 'scores': [{'category': 'G', 'score': 10}]}, {'ip': '10.10.10.10', 'scores': [{'category': 'A', 'score': 2}, {'category': 'X', 'score': 3}]}]
    

    【讨论】:

      猜你喜欢
      • 2017-09-18
      • 1970-01-01
      • 1970-01-01
      • 2016-08-15
      • 2017-04-10
      • 2019-12-27
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多